AppEndpoint Integration

AppEndpoint Integration

This guide explains how to integrate your application with AKM through the AppEndpoint's APP API. Applications communicate with AppEndpoint via a TCP socket connection on port 49600.

Connection Setup

Connect to AppEndpoint using a standard TCP socket:

Socket Connection
import socket

# Connect to AppEndpoint
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1", 49600))
print("[*] Connected to AppEndpoint")

APP API Header Format

All messages to/from AppEndpoint must be prefixed with a 12-byte header:

Offset Size Field Value
0 4 SOM (Start of Message) 0xA55A5AA5
4 2 Version (Major << 8) | Minor
6 2 Length Payload length (U16)
8 2 ~Length Bitwise complement of length
10 2 Reserved 0x0000
Build AKM Header
import struct

AKM_APPENDPOINT_APP_API_SOM = 0xA55A5AA5
AKM_MAJOR_VERSION = 1
AKM_MINOR_VERSION = 0

FMT_AKM_HEADER = "<LHHH2s"   # 12 bytes
AKM_HEADER_SIZE = struct.calcsize(FMT_AKM_HEADER)  # 12

def build_akm_header(payload_len):
    """Build the 12-byte AKM header for a message."""
    version = (AKM_MAJOR_VERSION << 8) | AKM_MINOR_VERSION
    msg_len = payload_len
    msg_len_comp = (~msg_len) & 0xFFFF
    return struct.pack(
        FMT_AKM_HEADER,
        AKM_APPENDPOINT_APP_API_SOM,
        version,
        msg_len,
        msg_len_comp,
        b"\x00\x00"
    )

Registration Flow

Before sending data, your application must register with an ATR ID and unique App Value. The App Value identifies your application within the ATR and must be unique among all applications registered to the same ATR.

Important: Each application must use a unique App Value. Two applications on the same ATR cannot share the same App Value.

Register Request
REGISTER_APP_REQUEST = 0x00000004
REGISTER_APP_RESPONSE = 0x00000044
STRUCT_REGISTER_REQUEST = "<LHH"  # opcode (4) + ATR_ID (2) + App_Value (2)

def build_register_request(atrid, appvalue):
    """Build a registration request packet."""
    payload = struct.pack(
        STRUCT_REGISTER_REQUEST,
        REGISTER_APP_REQUEST,
        atrid,
        appvalue
    )
    return build_akm_header(len(payload)) + payload

# Example: Register with ATR ID 0x0001 and App Value 100
packet = build_register_request(0x0001, 100)
sock.sendall(packet)

Sending Data

Once registered, use SEND_APP_DATA_REQUEST to send data to another application on the same ATR:

Send Data
MAX_APP_BUFFER_SIZE = 4046
SEND_APP_DATA_REQUEST = 0x00000005
STRUCT_SEND_DATA_REQUEST = f"<LHH{MAX_APP_BUFFER_SIZE}sL"

def build_app_data_packet(message, atrid, target_appvalue):
    """Build a data packet to send to another application."""
    data = message.encode()
    padded = data.ljust(MAX_APP_BUFFER_SIZE, b"\x00")

    payload = struct.pack(
        STRUCT_SEND_DATA_REQUEST,
        SEND_APP_DATA_REQUEST,
        atrid,
        target_appvalue,
        padded,
        len(data)
    )

    return build_akm_header(len(payload)) + payload

# Send "Hello" to App Value 200 on ATR 0x0001
packet = build_app_data_packet("Hello", 0x0001, 200)
sock.sendall(packet)

Receiving Data

Incoming data arrives asynchronously as RECEIVE_APP_DATA_RESPONSE messages. Use a receive thread to handle incoming data:

Receive Loop
import threading

RECEIVE_APP_DATA_RESPONSE = 0x00000046

def recv_exact(sock, size):
    """Receive exactly `size` bytes from socket."""
    data = b""
    while len(data) < size:
        chunk = sock.recv(size - len(data))
        if not chunk:
            raise ConnectionError("Socket closed")
        data += chunk
    return data

def receive_loop(sock, stop_event):
    """Background thread to receive data from AppEndpoint."""
    while not stop_event.is_set():
        # Read AKM header (12 bytes)
        hdr = recv_exact(sock, AKM_HEADER_SIZE)
        som, version, msg_len, msg_len_comp, _ = struct.unpack(FMT_AKM_HEADER, hdr)

        # Validate header
        if som != AKM_APPENDPOINT_APP_API_SOM:
            raise ValueError(f"Invalid SOM: 0x{som:08X}")
        if msg_len != (~msg_len_comp & 0xFFFF):
            raise ValueError("Length complement mismatch")

        # Read payload
        payload = recv_exact(sock, msg_len)
        resp_code = struct.unpack("<L", payload[:4])[0]

        if resp_code == RECEIVE_APP_DATA_RESPONSE:
            atrid, appv = struct.unpack("<HH", payload[4:8])
            buf = payload[8:8 + MAX_APP_BUFFER_SIZE]
            length = struct.unpack("<L", payload[8 + MAX_APP_BUFFER_SIZE:12 + MAX_APP_BUFFER_SIZE])[0]
            message = buf[:length].decode(errors="ignore")

            print(f"[RX] From ATR=0x{atrid:04X} App={appv}: {message}")

# Start receive thread
stop_event = threading.Event()
rx_thread = threading.Thread(target=receive_loop, args=(sock, stop_event), daemon=True)
rx_thread.start()

Protocol Constants

Constants
# AKM Header
AKM_APPENDPOINT_APP_API_SOM = 0xA55A5AA5
AKM_MAJOR_VERSION = 1
AKM_MINOR_VERSION = 0
AKM_HEADER_SIZE = 12

# Buffer sizes
MAX_APP_BUFFER_SIZE = 4046
MAX_APP_PAYLOAD_SIZE = 4084
TOTAL_ATRS_ALLOWED = 10

# Request codes
GET_ENDPOINT_INFO_REQUEST = 0x00000001
GET_ATR_INFO_REQUEST = 0x00000002
GET_VERSION_REQUEST = 0x00000003
REGISTER_APP_REQUEST = 0x00000004
SEND_APP_DATA_REQUEST = 0x00000005

# Response codes
GET_ENDPOINT_INFO_RESPONSE = 0x00000041
GET_ATR_INFO_RESPONSE = 0x00000042
GET_VERSION_RESPONSE = 0x00000043
REGISTER_APP_RESPONSE = 0x00000044
RECEIVE_APP_DATA_RESPONSE = 0x00000046

Next Steps