AMM Integration

AMM HMI API Integration

The AMM (AKM Management Module) provides a binary TCP interface for endpoint provisioning and trust relationship management. This guide shows how to connect directly to AMM using the HMI (Human-Machine Interface) API.

Connection: TCP port 49200 - Binary protocol with message headers

Protocol Overview

Message Header (12 bytes)

All messages to/from AMM are prefixed with a 12-byte header:

Offset Size Field Value
0 4 Start of Message 0xA55A5AA5
4 1 Major Version 0x01
5 1 Minor Version 0x00
6 2 Message Length Payload size (U16, little-endian)
8 2 Length Complement ~Message_Length & 0xFFFF
10 2 Reserved 0x0000

Request Types

Code Name Response
0x01 LIST_ENDPOINTS_REQUEST 0x41
0x02 GET_ENDPOINT_DETAILS_REQUEST 0x42
0x03 PROVISION_ENDPOINT_REQUEST 0x43
0x04 CREATE_OPERATIONAL_ATR_REQUEST 0x44
0x05 LIST_ATRS_REQUEST 0x45
0x06 DELETE_ENDPOINT_REQUEST 0x46
0x07 DELETE_ATR_REQUEST 0x47

Connection Setup

Connect to AMM using a TCP socket on port 49200:

Connection Setup
import socket
import struct

AMM_PORT = 49200
AMM_HOST = "172.31.7.13"
AMM_HMI_SOM = 0xA55A5AA5
MAX_ENDPOINTS = 32
MAX_ATRS = 64

class AMMClient:
    def __init__(self, host: str, port: int = AMM_PORT):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def close(self):
        self.sock.close()

    def _build_header(self, payload_len: int) -> bytes:
        """Build 12-byte AMM message header."""
        length_comp = (~payload_len) & 0xFFFF
        return struct.pack(
            "<IBBHHH",
            AMM_HMI_SOM,  # SOM
            0x01,         # Major version
            0x00,         # Minor version
            payload_len,
            length_comp,
            0x0000        # Reserved
        )

    def _recv_exact(self, size: int) -> bytes:
        """Receive exactly size bytes."""
        data = b""
        while len(data) < size:
            chunk = self.sock.recv(size - len(data))
            if not chunk:
                raise ConnectionError("Connection closed")
            data += chunk
        return data

# Connect to AMM
client = AMMClient(AMM_HOST)
print("[*] Connected to AMM")

List Endpoints

Query AMM for all provisioned endpoints:

List Endpoints
# Request codes
LIST_ENDPOINTS_REQUEST = 0x01
ENDPOINT_LIST_RESPONSE = 0x41

def list_endpoints(self, filter_type: int = 0) -> list:
    """List all endpoints from AMM."""
    # Build request: header + api_request (4) + filter (4)
    payload = struct.pack("<II", LIST_ENDPOINTS_REQUEST, filter_type)
    request = self._build_header(len(payload)) + payload

    self.sock.sendall(request)

    # Response: header(12) + api_response(4) + confirmation(4) + num(4) + endpoints(32*16)
    response = self._recv_exact(12 + 4 + 4 + 4 + MAX_ENDPOINTS * 16)

    # Parse header
    som, = struct.unpack("<I", response[0:4])
    if som != AMM_HMI_SOM:
        raise ValueError(f"Invalid SOM: 0x{som:08X}")

    # Parse payload
    api_response, confirmation, num_endpoints = struct.unpack("<III", response[12:24])

    endpoints = []
    offset = 24
    for i in range(min(num_endpoints, MAX_ENDPOINTS)):
        lower64, upper64 = struct.unpack("<QQ", response[offset:offset+16])
        endpoints.append({"lower64": lower64, "upper64": upper64})
        offset += 16

    return endpoints

# Usage
client = AMMClient(AMM_HOST)
endpoints = client.list_endpoints()
print(f"Found {len(endpoints)} endpoints:")
for i, ep in enumerate(endpoints):
    print(f"  [{i}] 0x{ep['lower64']:016X}")

List ATRs

Query AMM for all trust relationships:

List ATRs
LIST_ATRS_REQUEST = 0x05
ATR_LIST_RESPONSE = 0x45

ATR_CATEGORIES = {
    0: "Illegal",
    1: "Bootstrap",
    2: "Provisional",
    3: "Operational"
}

def list_atrs(self, filter_type: int = 0) -> list:
    """List all ATRs from AMM."""
    payload = struct.pack("<II", LIST_ATRS_REQUEST, filter_type)
    request = self._build_header(len(payload)) + payload

    self.sock.sendall(request)

    # Response: header(12) + api_response(4) + confirmation(4) + num(4) + atrs(64*20)
    # Each ATR: category(4) + lower64(8) + upper64(8) = 20 bytes
    response = self._recv_exact(12 + 4 + 4 + 4 + MAX_ATRS * 20)

    som, = struct.unpack("<I", response[0:4])
    if som != AMM_HMI_SOM:
        raise ValueError(f"Invalid SOM: 0x{som:08X}")

    api_response, confirmation, num_atrs = struct.unpack("<III", response[12:24])

    atrs = []
    offset = 24
    for i in range(min(num_atrs, MAX_ATRS)):
        category, lower64, upper64 = struct.unpack("<IQQ", response[offset:offset+20])
        atrs.append({
            "category": category,
            "category_name": ATR_CATEGORIES.get(category, "Unknown"),
            "lower64": lower64,
            "upper64": upper64
        })
        offset += 20

    return atrs

# Usage
atrs = client.list_atrs()
print(f"Found {len(atrs)} ATRs:")
for i, atr in enumerate(atrs):
    print(f"  [{i}] {atr['category_name']} - 0x{atr['lower64']:016X}")

Key Data Structures

Endpoint Identifier (16 bytes)

0-7 lower64 (U64)
8-15 upper64 (U64)

ATR Categories

0 Illegal Value
1 Bootstrap Provisional
2 Provisional
3 Operational

Next Steps