AppEndpoint Integration
This guide explains how to integrate your application with AKM through the AppEndpoint's APP API.
Applications communicate with AppEndpoint via a TCP socket connection on port 49600.
Connection Setup
Connect to AppEndpoint using a standard TCP socket:
import socket
# Connect to AppEndpoint
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("127.0.0.1", 49600))
print("[*] Connected to AppEndpoint")
APP API Header Format
All messages to/from AppEndpoint must be prefixed with a 12-byte header:
| Offset | Size | Field | Value |
|---|---|---|---|
| 0 | 4 | SOM (Start of Message) | 0xA55A5AA5 |
| 4 | 2 | Version | (Major << 8) | Minor |
| 6 | 2 | Length | Payload length (U16) |
| 8 | 2 | ~Length | Bitwise complement of length |
| 10 | 2 | Reserved | 0x0000 |
import struct
AKM_APPENDPOINT_APP_API_SOM = 0xA55A5AA5
AKM_MAJOR_VERSION = 1
AKM_MINOR_VERSION = 0
FMT_AKM_HEADER = "<LHHH2s" # 12 bytes
AKM_HEADER_SIZE = struct.calcsize(FMT_AKM_HEADER) # 12
def build_akm_header(payload_len):
"""Build the 12-byte AKM header for a message."""
version = (AKM_MAJOR_VERSION << 8) | AKM_MINOR_VERSION
msg_len = payload_len
msg_len_comp = (~msg_len) & 0xFFFF
return struct.pack(
FMT_AKM_HEADER,
AKM_APPENDPOINT_APP_API_SOM,
version,
msg_len,
msg_len_comp,
b"\x00\x00"
)
Registration Flow
Before sending data, your application must register with an ATR ID and unique App Value. The App Value identifies your application within the ATR and must be unique among all applications registered to the same ATR.
Important: Each application must use a unique App Value. Two applications on the same ATR cannot share the same App Value.
REGISTER_APP_REQUEST = 0x00000004
REGISTER_APP_RESPONSE = 0x00000044
STRUCT_REGISTER_REQUEST = "<LHH" # opcode (4) + ATR_ID (2) + App_Value (2)
def build_register_request(atrid, appvalue):
"""Build a registration request packet."""
payload = struct.pack(
STRUCT_REGISTER_REQUEST,
REGISTER_APP_REQUEST,
atrid,
appvalue
)
return build_akm_header(len(payload)) + payload
# Example: Register with ATR ID 0x0001 and App Value 100
packet = build_register_request(0x0001, 100)
sock.sendall(packet)
Sending Data
Once registered, use SEND_APP_DATA_REQUEST to send data to another application on the same ATR:
MAX_APP_BUFFER_SIZE = 4046
SEND_APP_DATA_REQUEST = 0x00000005
STRUCT_SEND_DATA_REQUEST = f"<LHH{MAX_APP_BUFFER_SIZE}sL"
def build_app_data_packet(message, atrid, target_appvalue):
"""Build a data packet to send to another application."""
data = message.encode()
padded = data.ljust(MAX_APP_BUFFER_SIZE, b"\x00")
payload = struct.pack(
STRUCT_SEND_DATA_REQUEST,
SEND_APP_DATA_REQUEST,
atrid,
target_appvalue,
padded,
len(data)
)
return build_akm_header(len(payload)) + payload
# Send "Hello" to App Value 200 on ATR 0x0001
packet = build_app_data_packet("Hello", 0x0001, 200)
sock.sendall(packet)
Receiving Data
Incoming data arrives asynchronously as RECEIVE_APP_DATA_RESPONSE messages. Use a receive thread to handle incoming data:
import threading
RECEIVE_APP_DATA_RESPONSE = 0x00000046
def recv_exact(sock, size):
"""Receive exactly `size` bytes from socket."""
data = b""
while len(data) < size:
chunk = sock.recv(size - len(data))
if not chunk:
raise ConnectionError("Socket closed")
data += chunk
return data
def receive_loop(sock, stop_event):
"""Background thread to receive data from AppEndpoint."""
while not stop_event.is_set():
# Read AKM header (12 bytes)
hdr = recv_exact(sock, AKM_HEADER_SIZE)
som, version, msg_len, msg_len_comp, _ = struct.unpack(FMT_AKM_HEADER, hdr)
# Validate header
if som != AKM_APPENDPOINT_APP_API_SOM:
raise ValueError(f"Invalid SOM: 0x{som:08X}")
if msg_len != (~msg_len_comp & 0xFFFF):
raise ValueError("Length complement mismatch")
# Read payload
payload = recv_exact(sock, msg_len)
resp_code = struct.unpack("<L", payload[:4])[0]
if resp_code == RECEIVE_APP_DATA_RESPONSE:
atrid, appv = struct.unpack("<HH", payload[4:8])
buf = payload[8:8 + MAX_APP_BUFFER_SIZE]
length = struct.unpack("<L", payload[8 + MAX_APP_BUFFER_SIZE:12 + MAX_APP_BUFFER_SIZE])[0]
message = buf[:length].decode(errors="ignore")
print(f"[RX] From ATR=0x{atrid:04X} App={appv}: {message}")
# Start receive thread
stop_event = threading.Event()
rx_thread = threading.Thread(target=receive_loop, args=(sock, stop_event), daemon=True)
rx_thread.start()
Protocol Constants
# AKM Header
AKM_APPENDPOINT_APP_API_SOM = 0xA55A5AA5
AKM_MAJOR_VERSION = 1
AKM_MINOR_VERSION = 0
AKM_HEADER_SIZE = 12
# Buffer sizes
MAX_APP_BUFFER_SIZE = 4046
MAX_APP_PAYLOAD_SIZE = 4084
TOTAL_ATRS_ALLOWED = 10
# Request codes
GET_ENDPOINT_INFO_REQUEST = 0x00000001
GET_ATR_INFO_REQUEST = 0x00000002
GET_VERSION_REQUEST = 0x00000003
REGISTER_APP_REQUEST = 0x00000004
SEND_APP_DATA_REQUEST = 0x00000005
# Response codes
GET_ENDPOINT_INFO_RESPONSE = 0x00000041
GET_ATR_INFO_RESPONSE = 0x00000042
GET_VERSION_RESPONSE = 0x00000043
REGISTER_APP_RESPONSE = 0x00000044
RECEIVE_APP_DATA_RESPONSE = 0x00000046