AMM HMI API Integration
The AMM (AKM Management Module) provides a binary TCP interface for endpoint provisioning and trust relationship management. This guide shows how to connect directly to AMM using the HMI (Human-Machine Interface) API.
Connection: TCP port 49200 - Binary protocol with message headers
Protocol Overview
Message Header (12 bytes)
All messages to/from AMM are prefixed with a 12-byte header:
| Offset | Size | Field | Value |
|---|---|---|---|
| 0 | 4 | Start of Message | 0xA55A5AA5 |
| 4 | 1 | Major Version | 0x01 |
| 5 | 1 | Minor Version | 0x00 |
| 6 | 2 | Message Length | Payload size (U16, little-endian) |
| 8 | 2 | Length Complement | ~Message_Length & 0xFFFF |
| 10 | 2 | Reserved | 0x0000 |
Request Types
| Code | Name | Response |
|---|---|---|
0x01 |
LIST_ENDPOINTS_REQUEST | 0x41 |
0x02 |
GET_ENDPOINT_DETAILS_REQUEST | 0x42 |
0x03 |
PROVISION_ENDPOINT_REQUEST | 0x43 |
0x04 |
CREATE_OPERATIONAL_ATR_REQUEST | 0x44 |
0x05 |
LIST_ATRS_REQUEST | 0x45 |
0x06 |
DELETE_ENDPOINT_REQUEST | 0x46 |
0x07 |
DELETE_ATR_REQUEST | 0x47 |
Connection Setup
Connect to AMM using a TCP socket on port 49200:
Connection Setup
import socket
import struct
AMM_PORT = 49200
AMM_HOST = "172.31.7.13"
AMM_HMI_SOM = 0xA55A5AA5
MAX_ENDPOINTS = 32
MAX_ATRS = 64
class AMMClient:
def __init__(self, host: str, port: int = AMM_PORT):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
def close(self):
self.sock.close()
def _build_header(self, payload_len: int) -> bytes:
"""Build 12-byte AMM message header."""
length_comp = (~payload_len) & 0xFFFF
return struct.pack(
"<IBBHHH",
AMM_HMI_SOM, # SOM
0x01, # Major version
0x00, # Minor version
payload_len,
length_comp,
0x0000 # Reserved
)
def _recv_exact(self, size: int) -> bytes:
"""Receive exactly size bytes."""
data = b""
while len(data) < size:
chunk = self.sock.recv(size - len(data))
if not chunk:
raise ConnectionError("Connection closed")
data += chunk
return data
# Connect to AMM
client = AMMClient(AMM_HOST)
print("[*] Connected to AMM")
List Endpoints
Query AMM for all provisioned endpoints:
List Endpoints
# Request codes
LIST_ENDPOINTS_REQUEST = 0x01
ENDPOINT_LIST_RESPONSE = 0x41
def list_endpoints(self, filter_type: int = 0) -> list:
"""List all endpoints from AMM."""
# Build request: header + api_request (4) + filter (4)
payload = struct.pack("<II", LIST_ENDPOINTS_REQUEST, filter_type)
request = self._build_header(len(payload)) + payload
self.sock.sendall(request)
# Response: header(12) + api_response(4) + confirmation(4) + num(4) + endpoints(32*16)
response = self._recv_exact(12 + 4 + 4 + 4 + MAX_ENDPOINTS * 16)
# Parse header
som, = struct.unpack("<I", response[0:4])
if som != AMM_HMI_SOM:
raise ValueError(f"Invalid SOM: 0x{som:08X}")
# Parse payload
api_response, confirmation, num_endpoints = struct.unpack("<III", response[12:24])
endpoints = []
offset = 24
for i in range(min(num_endpoints, MAX_ENDPOINTS)):
lower64, upper64 = struct.unpack("<QQ", response[offset:offset+16])
endpoints.append({"lower64": lower64, "upper64": upper64})
offset += 16
return endpoints
# Usage
client = AMMClient(AMM_HOST)
endpoints = client.list_endpoints()
print(f"Found {len(endpoints)} endpoints:")
for i, ep in enumerate(endpoints):
print(f" [{i}] 0x{ep['lower64']:016X}")
List ATRs
Query AMM for all trust relationships:
List ATRs
LIST_ATRS_REQUEST = 0x05
ATR_LIST_RESPONSE = 0x45
ATR_CATEGORIES = {
0: "Illegal",
1: "Bootstrap",
2: "Provisional",
3: "Operational"
}
def list_atrs(self, filter_type: int = 0) -> list:
"""List all ATRs from AMM."""
payload = struct.pack("<II", LIST_ATRS_REQUEST, filter_type)
request = self._build_header(len(payload)) + payload
self.sock.sendall(request)
# Response: header(12) + api_response(4) + confirmation(4) + num(4) + atrs(64*20)
# Each ATR: category(4) + lower64(8) + upper64(8) = 20 bytes
response = self._recv_exact(12 + 4 + 4 + 4 + MAX_ATRS * 20)
som, = struct.unpack("<I", response[0:4])
if som != AMM_HMI_SOM:
raise ValueError(f"Invalid SOM: 0x{som:08X}")
api_response, confirmation, num_atrs = struct.unpack("<III", response[12:24])
atrs = []
offset = 24
for i in range(min(num_atrs, MAX_ATRS)):
category, lower64, upper64 = struct.unpack("<IQQ", response[offset:offset+20])
atrs.append({
"category": category,
"category_name": ATR_CATEGORIES.get(category, "Unknown"),
"lower64": lower64,
"upper64": upper64
})
offset += 20
return atrs
# Usage
atrs = client.list_atrs()
print(f"Found {len(atrs)} ATRs:")
for i, atr in enumerate(atrs):
print(f" [{i}] {atr['category_name']} - 0x{atr['lower64']:016X}")
Key Data Structures
Endpoint Identifier (16 bytes)
| 0-7 | lower64 (U64) |
| 8-15 | upper64 (U64) |
ATR Categories
0 |
Illegal Value |
1 |
Bootstrap Provisional |
2 |
Provisional |
3 |
Operational |