Examples

Code Examples

Complete working examples extracted from the AKM Ping and Chat applications.

1. Building the AKM Header

Every message to AppEndpoint requires a 12-byte header with start-of-message marker, version, and length validation.

akm_header.py
import struct

# Protocol constants
AKM_APPENDPOINT_APP_API_SOM = 0xA55A5AA5
AKM_MAJOR_VERSION = 1
AKM_MINOR_VERSION = 0

# Header format: SOM (4) + Version (2) + Length (2) + ~Length (2) + Reserved (2)
FMT_AKM_HEADER = "<LHHH2s"   # 12 bytes little-endian
AKM_HEADER_SIZE = struct.calcsize(FMT_AKM_HEADER)  # 12

def build_akm_header(payload_len):
    """
    Build the 12-byte AKM APP API header.

    Args:
        payload_len: Length of the payload that follows the header

    Returns:
        bytes: 12-byte header
    """
    version = (AKM_MAJOR_VERSION << 8) | AKM_MINOR_VERSION
    msg_len = payload_len
    msg_len_comp = (~msg_len) & 0xFFFF  # Bitwise complement for validation

    return struct.pack(
        FMT_AKM_HEADER,
        AKM_APPENDPOINT_APP_API_SOM,  # Start of message marker
        version,                       # Protocol version
        msg_len,                       # Payload length
        msg_len_comp,                  # Length complement (for validation)
        b"\x00\x00"                    # Reserved bytes
    )

2. Socket Connection

Establish a TCP connection to AppEndpoint and implement reliable data reception.

connection.py
import socket

def connect_to_appendpoint(ip="127.0.0.1", port=49600):
    """
    Connect to AppEndpoint.

    Args:
        ip: AppEndpoint IP address
        port: AppEndpoint port (default 49600)

    Returns:
        socket: Connected socket
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))
    return sock

def recv_exact(sock, size):
    """
    Receive exactly `size` bytes from socket.

    This is critical for binary protocols where you need
    to read exact amounts of data.

    Args:
        sock: Socket to read from
        size: Number of bytes to read

    Returns:
        bytes: Exactly `size` bytes

    Raises:
        ConnectionError: If socket closes before all data received
    """
    data = b""
    while len(data) < size:
        chunk = sock.recv(size - len(data))
        if not chunk:
            raise ConnectionError("Socket closed")
        data += chunk
    return data

# Usage
sock = connect_to_appendpoint("192.168.1.100", 49600)
print("[*] Connected to AppEndpoint")

3. Application Registration

Register your application with an ATR and unique App Value before sending data.

registration.py
import struct
import threading

REGISTER_APP_REQUEST = 0x00000004
REGISTER_APP_RESPONSE = 0x00000044
STRUCT_REGISTER_REQUEST = "<LHH"  # opcode + ATR_ID + App_Value

# Global state for registration
register_wait_event = threading.Event()
register_success = False

def build_register_request(atrid, appvalue):
    """
    Build a registration request packet.

    Args:
        atrid: ATR abbreviated ID (U16)
        appvalue: Unique application value (U16)

    Returns:
        bytes: Complete packet with header
    """
    payload = struct.pack(
        STRUCT_REGISTER_REQUEST,
        REGISTER_APP_REQUEST,
        atrid,
        appvalue
    )
    return build_akm_header(len(payload)) + payload

def do_register(sock, atrid, appvalue, timeout=2.0):
    """
    Register with AppEndpoint.

    Args:
        sock: Connected socket
        atrid: ATR ID (can be hex string like "0x0001" or int)
        appvalue: Application value (unique per ATR)
        timeout: Seconds to wait for response

    Returns:
        bool: True if registration successful
    """
    global register_success

    # Parse ATR ID if hex string
    if isinstance(atrid, str):
        atrid = int(atrid, 16)

    register_wait_event.clear()
    sock.sendall(build_register_request(atrid, appvalue))

    # Wait for response (handled by receive thread)
    if register_wait_event.wait(timeout) and register_success:
        print(f"[*] Registered: ATR=0x{atrid:04X}, App={appvalue}")
        return True
    else:
        print("[ERROR] Registration failed")
        return False

4. Sending Data

Send data to another application on the same ATR.

send_data.py
MAX_APP_BUFFER_SIZE = 4046
SEND_APP_DATA_REQUEST = 0x00000005
STRUCT_SEND_DATA_REQUEST = f"<LHH{MAX_APP_BUFFER_SIZE}sL"

def build_app_data_packet(message, atrid, target_appvalue):
    """
    Build a data packet to send to another application.

    Args:
        message: String message to send
        atrid: ATR ID for the connection
        target_appvalue: Target application's App Value

    Returns:
        bytes: Complete packet with header
    """
    # Encode and pad the data
    data = message.encode()
    padded = data.ljust(MAX_APP_BUFFER_SIZE, b"\x00")

    # Build payload
    payload = struct.pack(
        STRUCT_SEND_DATA_REQUEST,
        SEND_APP_DATA_REQUEST,
        atrid,
        target_appvalue,
        padded,
        len(data)  # Actual length
    )

    return build_akm_header(len(payload)) + payload

# Send a message
def send_message(sock, atrid, target_app, message):
    """Send a message to target application."""
    packet = build_app_data_packet(message, atrid, target_app)
    sock.sendall(packet)
    print(f"[TX] Sent to App={target_app}: {message}")

5. Receive Loop Pattern

Threaded receive loop to handle incoming messages asynchronously.

receive_loop.py
import threading

RECEIVE_APP_DATA_RESPONSE = 0x00000046

def receive_loop(sock, stop_event, on_message=None):
    """
    Background thread to receive messages from AppEndpoint.

    Args:
        sock: Connected socket
        stop_event: threading.Event to signal shutdown
        on_message: Callback for received messages (atrid, appvalue, message)
    """
    global register_success

    try:
        while not stop_event.is_set():
            # 1. Read AKM header (12 bytes)
            hdr = recv_exact(sock, AKM_HEADER_SIZE)
            som, version, msg_len, msg_len_comp, _ = struct.unpack(
                FMT_AKM_HEADER, hdr
            )

            # 2. Validate header
            if som != AKM_APPENDPOINT_APP_API_SOM:
                raise ValueError(f"Invalid SOM: 0x{som:08X}")
            if msg_len != (~msg_len_comp & 0xFFFF):
                raise ValueError("Length complement mismatch")

            # 3. Read full payload
            payload = recv_exact(sock, msg_len)
            resp_code = struct.unpack("<L", payload[:4])[0]

            # 4. Handle response types
            if resp_code == REGISTER_APP_RESPONSE:
                conf, _, _ = struct.unpack("<LHH", payload[4:12])
                register_success = (conf == 0)
                register_wait_event.set()

            elif resp_code == RECEIVE_APP_DATA_RESPONSE:
                # Parse incoming data
                atrid, appv = struct.unpack("<HH", payload[4:8])
                buf = payload[8:8 + MAX_APP_BUFFER_SIZE]
                length = struct.unpack(
                    "<L",
                    payload[8 + MAX_APP_BUFFER_SIZE:12 + MAX_APP_BUFFER_SIZE]
                )[0]

                message = buf[:length].decode(errors="ignore")
                print(f"[RX] From ATR=0x{atrid:04X} App={appv}: {message}")

                if on_message:
                    on_message(atrid, appv, message)

    except Exception as e:
        if not stop_event.is_set():
            print(f"[ERROR] Receive thread: {e}")

# Start the receive thread
stop_event = threading.Event()
rx_thread = threading.Thread(
    target=receive_loop,
    args=(sock, stop_event),
    daemon=True
)
rx_thread.start()

6. Complete PING/ACK Flow

A complete example showing how to implement a PING/ACK protocol between two applications.

ping_example.py
import time

PING_TIMEOUT_SEC = 2.0

# Tracking state
ack_event = threading.Event()
packets_sent = 0
packets_received = 0
packets_lost = 0
latencies = []

def handle_message(atrid, appv, message):
    """Handle incoming messages - respond to PINGs, track ACKs."""
    global packets_received

    if message.startswith("PING|"):
        # Received a PING - send ACK back
        _, uid, *_ = message.split("|")
        atr_hex, src_app, _, seq = uid.split(":")
        src_app = int(src_app)

        print(f"[RX] PING seq={seq} from App={src_app}")

        # Send ACK back to source
        sock.sendall(
            build_app_data_packet(f"ACK|{uid}", atrid, src_app)
        )

    elif message.startswith("ACK|"):
        # Received ACK for our PING
        packets_received += 1
        ack_event.set()

def do_ping(sock, atrid, my_appvalue, target_appvalue, count=10):
    """
    Send PING packets and wait for ACK responses.

    Args:
        sock: Connected socket
        atrid: ATR ID
        my_appvalue: Our application value
        target_appvalue: Target application value
        count: Number of pings to send
    """
    global packets_sent, packets_lost

    packets_sent = packets_lost = 0
    latencies.clear()

    print(f"\nPING ATR=0x{atrid:04X} -> App={target_appvalue}\n")

    for seq in range(1, count + 1):
        # Create unique ID for this ping
        uid = f"{atrid:04X}:{my_appvalue}:{target_appvalue}:{seq}"
        ack_event.clear()

        # Send PING
        start = time.time()
        timestamp = int(time.monotonic() * 1_000_000)
        sock.sendall(
            build_app_data_packet(
                f"PING|{uid}|{timestamp}",
                atrid,
                target_appvalue
            )
        )
        packets_sent += 1

        # Wait for ACK
        if ack_event.wait(PING_TIMEOUT_SEC):
            rtt = (time.time() - start) * 1000
            latencies.append(rtt)
            print(f"64 bytes seq={seq} time={rtt:.2f} ms")
        else:
            packets_lost += 1
            print(f"Request timeout for seq {seq}")

        time.sleep(1)  # Wait between pings

    # Print statistics
    avg = sum(latencies) / len(latencies) if latencies else 0
    print(f"\n--- ping statistics ---")
    print(f"{packets_sent} transmitted, {packets_received} received, "
          f"{(packets_lost / packets_sent * 100):.0f}% loss")
    print(f"rtt avg={avg:.2f} ms")

7. Chat Application Pattern

Pattern from the AKM Chat application showing message delivery with acknowledgment.

chat_example.py
import threading
import time

CHAT_ACK_TIMEOUT = 1.0
SAFE_APP_PAYLOAD = MAX_APP_BUFFER_SIZE - 16

class ChatClient:
    def __init__(self, sock, atrid, my_appvalue):
        self.sock = sock
        self.atrid = atrid
        self.my_appvalue = my_appvalue
        self.seq = 0
        self.pending = {}  # UID -> timestamp
        self.lock = threading.Lock()

    def send_chat(self, text, target_appvalue):
        """Send a chat message with delivery confirmation."""
        self.seq += 1

        # UID format: ATRID:SrcApp:TargetApp:Seq
        uid = f"{self.atrid:04X}:{self.my_appvalue}:{target_appvalue}:{self.seq}"

        # Build message (truncate if too long)
        payload_txt = f"CHAT|{uid}|{text}".encode()[:SAFE_APP_PAYLOAD]

        # Track pending delivery
        with self.lock:
            self.pending[uid] = time.time()

        # Send message
        self.sock.sendall(
            build_app_data_packet(
                payload_txt.decode(),
                self.atrid,
                target_appvalue
            )
        )

        # Start timeout thread
        threading.Thread(
            target=self._wait_ack,
            args=(uid,),
            daemon=True
        ).start()

    def _wait_ack(self, uid):
        """Wait for ACK, report timeout if not received."""
        time.sleep(CHAT_ACK_TIMEOUT)

        with self.lock:
            if uid in self.pending:
                self.pending.pop(uid)
                print(f"[!] Delivery timeout for message {uid}")

    def handle_incoming(self, atrid, appv, message):
        """Handle incoming chat messages and ACKs."""
        if message.startswith("CHAT|"):
            # Received chat message
            _, uid, text = message.split("|", 2)
            print(f"[Peer] {text}")

            # Send ACK back - use source app from UID
            _, src_app_str, _, _ = uid.split(":")
            src_app = int(src_app_str)

            self.sock.sendall(
                build_app_data_packet(f"ACK|{uid}", atrid, src_app)
            )

        elif message.startswith("ACK|"):
            # Received delivery confirmation
            uid = message.split("|")[1]

            with self.lock:
                if uid in self.pending:
                    self.pending.pop(uid)
                    print("[+] Delivered")

Complete Source Code

For complete working applications, see the source code at:

  • /opt/bluewave/mako/akm/AKM_APPS/AKM_PING_APP/akm_App_Ping.py
  • /opt/bluewave/mako/akm/AKM_APPS/AKM_CHAT_APP/akm_App_Chat.py