Code Examples
Complete working examples extracted from the AKM Ping and Chat applications.
1. Building the AKM Header
Every message to AppEndpoint requires a 12-byte header with start-of-message marker, version, and length validation.
import struct
# Protocol constants
AKM_APPENDPOINT_APP_API_SOM = 0xA55A5AA5
AKM_MAJOR_VERSION = 1
AKM_MINOR_VERSION = 0
# Header format: SOM (4) + Version (2) + Length (2) + ~Length (2) + Reserved (2)
FMT_AKM_HEADER = "<LHHH2s" # 12 bytes little-endian
AKM_HEADER_SIZE = struct.calcsize(FMT_AKM_HEADER) # 12
def build_akm_header(payload_len):
"""
Build the 12-byte AKM APP API header.
Args:
payload_len: Length of the payload that follows the header
Returns:
bytes: 12-byte header
"""
version = (AKM_MAJOR_VERSION << 8) | AKM_MINOR_VERSION
msg_len = payload_len
msg_len_comp = (~msg_len) & 0xFFFF # Bitwise complement for validation
return struct.pack(
FMT_AKM_HEADER,
AKM_APPENDPOINT_APP_API_SOM, # Start of message marker
version, # Protocol version
msg_len, # Payload length
msg_len_comp, # Length complement (for validation)
b"\x00\x00" # Reserved bytes
)
2. Socket Connection
Establish a TCP connection to AppEndpoint and implement reliable data reception.
import socket
def connect_to_appendpoint(ip="127.0.0.1", port=49600):
"""
Connect to AppEndpoint.
Args:
ip: AppEndpoint IP address
port: AppEndpoint port (default 49600)
Returns:
socket: Connected socket
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
return sock
def recv_exact(sock, size):
"""
Receive exactly `size` bytes from socket.
This is critical for binary protocols where you need
to read exact amounts of data.
Args:
sock: Socket to read from
size: Number of bytes to read
Returns:
bytes: Exactly `size` bytes
Raises:
ConnectionError: If socket closes before all data received
"""
data = b""
while len(data) < size:
chunk = sock.recv(size - len(data))
if not chunk:
raise ConnectionError("Socket closed")
data += chunk
return data
# Usage
sock = connect_to_appendpoint("192.168.1.100", 49600)
print("[*] Connected to AppEndpoint")
3. Application Registration
Register your application with an ATR and unique App Value before sending data.
import struct
import threading
REGISTER_APP_REQUEST = 0x00000004
REGISTER_APP_RESPONSE = 0x00000044
STRUCT_REGISTER_REQUEST = "<LHH" # opcode + ATR_ID + App_Value
# Global state for registration
register_wait_event = threading.Event()
register_success = False
def build_register_request(atrid, appvalue):
"""
Build a registration request packet.
Args:
atrid: ATR abbreviated ID (U16)
appvalue: Unique application value (U16)
Returns:
bytes: Complete packet with header
"""
payload = struct.pack(
STRUCT_REGISTER_REQUEST,
REGISTER_APP_REQUEST,
atrid,
appvalue
)
return build_akm_header(len(payload)) + payload
def do_register(sock, atrid, appvalue, timeout=2.0):
"""
Register with AppEndpoint.
Args:
sock: Connected socket
atrid: ATR ID (can be hex string like "0x0001" or int)
appvalue: Application value (unique per ATR)
timeout: Seconds to wait for response
Returns:
bool: True if registration successful
"""
global register_success
# Parse ATR ID if hex string
if isinstance(atrid, str):
atrid = int(atrid, 16)
register_wait_event.clear()
sock.sendall(build_register_request(atrid, appvalue))
# Wait for response (handled by receive thread)
if register_wait_event.wait(timeout) and register_success:
print(f"[*] Registered: ATR=0x{atrid:04X}, App={appvalue}")
return True
else:
print("[ERROR] Registration failed")
return False
4. Sending Data
Send data to another application on the same ATR.
MAX_APP_BUFFER_SIZE = 4046
SEND_APP_DATA_REQUEST = 0x00000005
STRUCT_SEND_DATA_REQUEST = f"<LHH{MAX_APP_BUFFER_SIZE}sL"
def build_app_data_packet(message, atrid, target_appvalue):
"""
Build a data packet to send to another application.
Args:
message: String message to send
atrid: ATR ID for the connection
target_appvalue: Target application's App Value
Returns:
bytes: Complete packet with header
"""
# Encode and pad the data
data = message.encode()
padded = data.ljust(MAX_APP_BUFFER_SIZE, b"\x00")
# Build payload
payload = struct.pack(
STRUCT_SEND_DATA_REQUEST,
SEND_APP_DATA_REQUEST,
atrid,
target_appvalue,
padded,
len(data) # Actual length
)
return build_akm_header(len(payload)) + payload
# Send a message
def send_message(sock, atrid, target_app, message):
"""Send a message to target application."""
packet = build_app_data_packet(message, atrid, target_app)
sock.sendall(packet)
print(f"[TX] Sent to App={target_app}: {message}")
5. Receive Loop Pattern
Threaded receive loop to handle incoming messages asynchronously.
import threading
RECEIVE_APP_DATA_RESPONSE = 0x00000046
def receive_loop(sock, stop_event, on_message=None):
"""
Background thread to receive messages from AppEndpoint.
Args:
sock: Connected socket
stop_event: threading.Event to signal shutdown
on_message: Callback for received messages (atrid, appvalue, message)
"""
global register_success
try:
while not stop_event.is_set():
# 1. Read AKM header (12 bytes)
hdr = recv_exact(sock, AKM_HEADER_SIZE)
som, version, msg_len, msg_len_comp, _ = struct.unpack(
FMT_AKM_HEADER, hdr
)
# 2. Validate header
if som != AKM_APPENDPOINT_APP_API_SOM:
raise ValueError(f"Invalid SOM: 0x{som:08X}")
if msg_len != (~msg_len_comp & 0xFFFF):
raise ValueError("Length complement mismatch")
# 3. Read full payload
payload = recv_exact(sock, msg_len)
resp_code = struct.unpack("<L", payload[:4])[0]
# 4. Handle response types
if resp_code == REGISTER_APP_RESPONSE:
conf, _, _ = struct.unpack("<LHH", payload[4:12])
register_success = (conf == 0)
register_wait_event.set()
elif resp_code == RECEIVE_APP_DATA_RESPONSE:
# Parse incoming data
atrid, appv = struct.unpack("<HH", payload[4:8])
buf = payload[8:8 + MAX_APP_BUFFER_SIZE]
length = struct.unpack(
"<L",
payload[8 + MAX_APP_BUFFER_SIZE:12 + MAX_APP_BUFFER_SIZE]
)[0]
message = buf[:length].decode(errors="ignore")
print(f"[RX] From ATR=0x{atrid:04X} App={appv}: {message}")
if on_message:
on_message(atrid, appv, message)
except Exception as e:
if not stop_event.is_set():
print(f"[ERROR] Receive thread: {e}")
# Start the receive thread
stop_event = threading.Event()
rx_thread = threading.Thread(
target=receive_loop,
args=(sock, stop_event),
daemon=True
)
rx_thread.start()
6. Complete PING/ACK Flow
A complete example showing how to implement a PING/ACK protocol between two applications.
import time
PING_TIMEOUT_SEC = 2.0
# Tracking state
ack_event = threading.Event()
packets_sent = 0
packets_received = 0
packets_lost = 0
latencies = []
def handle_message(atrid, appv, message):
"""Handle incoming messages - respond to PINGs, track ACKs."""
global packets_received
if message.startswith("PING|"):
# Received a PING - send ACK back
_, uid, *_ = message.split("|")
atr_hex, src_app, _, seq = uid.split(":")
src_app = int(src_app)
print(f"[RX] PING seq={seq} from App={src_app}")
# Send ACK back to source
sock.sendall(
build_app_data_packet(f"ACK|{uid}", atrid, src_app)
)
elif message.startswith("ACK|"):
# Received ACK for our PING
packets_received += 1
ack_event.set()
def do_ping(sock, atrid, my_appvalue, target_appvalue, count=10):
"""
Send PING packets and wait for ACK responses.
Args:
sock: Connected socket
atrid: ATR ID
my_appvalue: Our application value
target_appvalue: Target application value
count: Number of pings to send
"""
global packets_sent, packets_lost
packets_sent = packets_lost = 0
latencies.clear()
print(f"\nPING ATR=0x{atrid:04X} -> App={target_appvalue}\n")
for seq in range(1, count + 1):
# Create unique ID for this ping
uid = f"{atrid:04X}:{my_appvalue}:{target_appvalue}:{seq}"
ack_event.clear()
# Send PING
start = time.time()
timestamp = int(time.monotonic() * 1_000_000)
sock.sendall(
build_app_data_packet(
f"PING|{uid}|{timestamp}",
atrid,
target_appvalue
)
)
packets_sent += 1
# Wait for ACK
if ack_event.wait(PING_TIMEOUT_SEC):
rtt = (time.time() - start) * 1000
latencies.append(rtt)
print(f"64 bytes seq={seq} time={rtt:.2f} ms")
else:
packets_lost += 1
print(f"Request timeout for seq {seq}")
time.sleep(1) # Wait between pings
# Print statistics
avg = sum(latencies) / len(latencies) if latencies else 0
print(f"\n--- ping statistics ---")
print(f"{packets_sent} transmitted, {packets_received} received, "
f"{(packets_lost / packets_sent * 100):.0f}% loss")
print(f"rtt avg={avg:.2f} ms")
7. Chat Application Pattern
Pattern from the AKM Chat application showing message delivery with acknowledgment.
import threading
import time
CHAT_ACK_TIMEOUT = 1.0
SAFE_APP_PAYLOAD = MAX_APP_BUFFER_SIZE - 16
class ChatClient:
def __init__(self, sock, atrid, my_appvalue):
self.sock = sock
self.atrid = atrid
self.my_appvalue = my_appvalue
self.seq = 0
self.pending = {} # UID -> timestamp
self.lock = threading.Lock()
def send_chat(self, text, target_appvalue):
"""Send a chat message with delivery confirmation."""
self.seq += 1
# UID format: ATRID:SrcApp:TargetApp:Seq
uid = f"{self.atrid:04X}:{self.my_appvalue}:{target_appvalue}:{self.seq}"
# Build message (truncate if too long)
payload_txt = f"CHAT|{uid}|{text}".encode()[:SAFE_APP_PAYLOAD]
# Track pending delivery
with self.lock:
self.pending[uid] = time.time()
# Send message
self.sock.sendall(
build_app_data_packet(
payload_txt.decode(),
self.atrid,
target_appvalue
)
)
# Start timeout thread
threading.Thread(
target=self._wait_ack,
args=(uid,),
daemon=True
).start()
def _wait_ack(self, uid):
"""Wait for ACK, report timeout if not received."""
time.sleep(CHAT_ACK_TIMEOUT)
with self.lock:
if uid in self.pending:
self.pending.pop(uid)
print(f"[!] Delivery timeout for message {uid}")
def handle_incoming(self, atrid, appv, message):
"""Handle incoming chat messages and ACKs."""
if message.startswith("CHAT|"):
# Received chat message
_, uid, text = message.split("|", 2)
print(f"[Peer] {text}")
# Send ACK back - use source app from UID
_, src_app_str, _, _ = uid.split(":")
src_app = int(src_app_str)
self.sock.sendall(
build_app_data_packet(f"ACK|{uid}", atrid, src_app)
)
elif message.startswith("ACK|"):
# Received delivery confirmation
uid = message.split("|")[1]
with self.lock:
if uid in self.pending:
self.pending.pop(uid)
print("[+] Delivered")
Complete Source Code
For complete working applications, see the source code at:
/opt/bluewave/mako/akm/AKM_APPS/AKM_PING_APP/akm_App_Ping.py/opt/bluewave/mako/akm/AKM_APPS/AKM_CHAT_APP/akm_App_Chat.py