Skip to content

RobustMQ AI: AI Capability Extension Planning Based on the NATS Protocol

Background and Thinking

RobustMQ is positioned as a multi-protocol unified messaging engine — one storage layer underneath, supporting multiple consumption perspectives for MQTT, Kafka, AMQP, and other protocols on top. When planning AI-related capabilities, we faced a core question: what kind of protocol do AI Agents need for communication between themselves?

The initial idea was to design a brand-new AI-specific protocol. But that path means a new wire protocol specification, new SDKs, and a new client ecosystem — all built from scratch. For a foundational software project, the cold-start cost of a protocol ecosystem is extremely high, and this direction was rejected.

The breakthrough came from a deep analysis of the NATS protocol. The way JetStream extends NATS gave us inspiration: JetStream introduces no new wire protocol commands — all operations are completed via standard PUB/SUB. The server exposes a $JS.API.* Subject namespace; clients send JSON requests and the server returns JSON responses. All of JetStream's capabilities — Stream management, Consumer management, ACK mechanism — are application-layer extensions on top of the NATS protocol.

Exactly the same approach can be applied to AI capabilities. We can define a $mq9.AI.API.* Subject namespace on the NATS protocol and use the standard NATS Request-Reply pattern to implement AI Agent registration, discovery, invocation, and orchestration. No new protocol, no new SDK — any NATS client (Go, Rust, Python, Java, JavaScript, ...) can integrate directly.

Why NATS

Among all the protocols supported by RobustMQ, NATS is the most suitable for AI scenarios, for the following reasons:

Text Protocol, AI-Native Friendly

NATS is a pure text protocol over TCP — PUB, SUB, MSG are just a few simple text commands. LLMs can directly generate and parse this format without binary encoding or decoding. AI Agents can even interact with RobustMQ directly over a TCP connection, with zero dependencies.

By comparison, Kafka's binary protocol requires a dedicated client library; MQTT's binary frame structure requires a codec; AMQP's frame protocol is even more complex. For AI Agents that need rapid integration and lightweight communication, the advantage of a text protocol is decisive.

Request-Reply Naturally Fits the Agent Pattern

The core interaction pattern for AI Agents is "send a request, wait for a response" — send an inference task, wait for the model to return results. NATS's Request-Reply is exactly this pattern: send a message with a reply-to, and the server (or another Agent) processes it and sends the result to the reply-to address.

This pattern naturally maps to:

  • Task delegation and result return between Agents
  • Tool Use: an Agent sends a request to call an external tool and waits for the tool's result
  • Intermediate result passing in Chain of Thought
  • Request-response chains in multi-Agent collaboration

Subject Hierarchical Naming Fits Capability Routing

NATS Subjects use a .-separated hierarchical naming structure with * and > wildcard support. This naturally fits the organization and routing of AI capabilities:

ai.agent.translation.en-to-zh    # Translation Agent
ai.agent.code.review              # Code review Agent
ai.agent.code.generation          # Code generation Agent
ai.tool.search.web                # Web search tool
ai.tool.search.database            # Database query tool

A scheduler can use wildcards to listen:

ai.agent.>        # Listen to all Agents
ai.tool.*         # Listen to top-level calls for all tools
ai.agent.code.*   # Listen to all code-related Agents

Queue Group Naturally Provides Agent Load Balancing

Multiple AI Agents with the same capability join the same Queue Group, and NATS automatically distributes requests to one of them for processing. No additional load balancer or scheduler is needed.

# Three translation Agent instances join the same Queue Group
SUB ai.agent.translation.en-to-zh translators 1
SUB ai.agent.translation.en-to-zh translators 2
SUB ai.agent.translation.en-to-zh translators 3

Each translation request is handled by only one Agent, naturally achieving horizontal scaling.

Millions of TPS to Support High-Frequency Agent Communication

AI Agent clusters can communicate at very high frequencies — multi-Agent collaboration, tool call chains, real-time inference requests. NATS is designed for millions of TPS and sub-millisecond latency, fully capable of handling such loads.

Ready-made SDKs and Ecosystem

NATS officially provides client libraries for Go, Rust, Python, Java, JavaScript, C#, C, Ruby, Elixir, and more, with community implementations for 30+ languages. AI developers using any language can integrate directly with zero learning curve.

Extension Design: $mq9.AI.API.*

Drawing on JetStream's $JS.API.* pattern, we define a $mq9.AI.API.* Subject namespace on the NATS protocol for managing and invoking AI capabilities. All operations use standard NATS Request-Reply, with JSON request and response bodies.

Agent Management

# Register an Agent
PUB $mq9.AI.API.AGENT.REGISTER _INBOX.1 128\r\n
{
  "name": "translator-01",
  "type": "translation",
  "capabilities": ["en-to-zh", "zh-to-en", "en-to-ja"],
  "max_concurrency": 10,
  "model": "gpt-4",
  "metadata": {"version": "1.0", "region": "us-east"}
}\r\n

# Deregister an Agent
PUB $mq9.AI.API.AGENT.DEREGISTER.translator-01 _INBOX.2 0\r\n
\r\n

# Query Agent info
PUB $mq9.AI.API.AGENT.INFO.translator-01 _INBOX.3 0\r\n
\r\n

# List all Agents
PUB $mq9.AI.API.AGENT.LIST _INBOX.4 2\r\n
{}\r\n

# Query available Agents by capability
PUB $mq9.AI.API.AGENT.DISCOVER _INBOX.5 32\r\n
{"capability": "en-to-zh"}\r\n

Agent Invocation

# Synchronous invocation (Request-Reply)
PUB $mq9.AI.API.AGENT.INVOKE.translator-01 _INBOX.6 64\r\n
{
  "task": "en-to-zh",
  "input": "Hello, how are you?",
  "timeout_ms": 5000
}\r\n

→ MSG _INBOX.6 1 96\r\n
{
  "output": "你好,你好吗?",
  "latency_ms": 230,
  "model": "gpt-4",
  "tokens": {"input": 12, "output": 8}
}\r\n

Invoke by Capability (Auto-routing)

Without specifying a specific Agent, invoke by capability type — automatically load-balanced by Queue Group:

# Send to capability Subject; any available Agent handles it
PUB ai.agent.translation.en-to-zh _INBOX.7 48\r\n
{"input": "Good morning", "task": "translate"}\r\n

Tool Registration and Invocation

# Register a tool
PUB $mq9.AI.API.TOOL.REGISTER _INBOX.8 96\r\n
{
  "name": "web-search",
  "description": "Search the web for information",
  "parameters": {
    "query": {"type": "string", "required": true},
    "max_results": {"type": "int", "default": 10}
  }
}\r\n

# Invoke a tool
PUB $mq9.AI.API.TOOL.INVOKE.web-search _INBOX.9 48\r\n
{"query": "RobustMQ architecture", "max_results": 5}\r\n

Task Orchestration

# Create a multi-step task
PUB $mq9.AI.API.PIPELINE.CREATE _INBOX.10 256\r\n
{
  "name": "translate-and-summarize",
  "steps": [
    {"agent": "translator-01", "task": "en-to-zh", "input_from": "user"},
    {"agent": "summarizer-01", "task": "summarize", "input_from": "step.0.output"}
  ]
}\r\n

# Execute the task
PUB $mq9.AI.API.PIPELINE.RUN.translate-and-summarize _INBOX.11 64\r\n
{"input": "A long English article..."}\r\n

Agent Health Check and Monitoring

# Heartbeat report (Agent sends periodically)
PUB $mq9.AI.API.AGENT.HEARTBEAT.translator-01 0\r\n
\r\n

# Query Agent status
PUB $mq9.AI.API.AGENT.STATUS.translator-01 _INBOX.12 0\r\n
\r\n

→ MSG _INBOX.12 1 128\r\n
{
  "name": "translator-01",
  "status": "healthy",
  "active_tasks": 3,
  "total_processed": 12847,
  "avg_latency_ms": 245,
  "uptime_seconds": 86400
}\r\n

Integration with RobustMQ's Unified Storage

The $mq9.AI.API.* capabilities do not exist in isolation — they are deeply integrated with RobustMQ's unified storage model:

IoT data-driven AI inference: devices report data via MQTT, which is written to RobustMQ's unified storage. AI Agents subscribe to the data stream via NATS and perform real-time inference. Inference results are written back to storage, and business systems consume them via the Kafka-compatible protocol. Everything happens within a single cluster.

MQTT devices → [Unified Storage] → NATS AI Agent → [Unified Storage] → Kafka business consumption

Inference result persistence: AI Agent inference results can be published directly to ordinary Subjects and persisted by the RobustMQ storage engine. Other Agents or business systems can replay these results by offset for auditing, analysis, or secondary processing.

Agent state management: Agent registration information, health status, and task history can be stored in RobustMQ, leveraging the unified storage's persistence and multi-view consumption capabilities for management.

Protocol Roadmap

Phase 1 (current): MQTT core → production-ready
Phase 2: Kafka protocol compatibility → data pipeline scenarios
Phase 3: NATS protocol compatibility → lightweight messaging + AI foundation
Phase 4: $mq9.AI.API.* extension → AI Agent communication and orchestration

Each phase builds on the previous one. The cost of implementing NATS protocol compatibility itself is relatively low (text protocol, simple commands), and the AI extension is pure business-layer logic on top of NATS compatibility — no protocol-layer changes involved.

Differentiation

The differentiation of this approach lies in:

  1. No new protocol invented: by leveraging NATS's protocol and ecosystem, AI developers can integrate using their existing NATS SDK with zero switching cost.
  2. No dependency on external AI frameworks: Agent registration, discovery, invocation, and load balancing are all handled within RobustMQ itself, without needing frameworks like LangChain or AutoGen as the communication layer.
  3. Unified storage connects the data pipeline: IoT data collection (MQTT) → AI inference (NATS) → result consumption (Kafka), all within a single cluster, with data written only once.
  4. RobustMQ-exclusive: $mq9.AI.API.* is an extension we define; other NATS implementations do not have this capability. This is a differentiating feature on top of protocol compatibility.

Disclaimer

This article represents technical planning and vision for the RobustMQ project, documenting our thinking on the direction of combining AI with messaging systems. Specific implementation details and API design may be adjusted during development. Community discussion and feedback are welcome.

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀