Skip to content

RobustMQ AI:基于 NATS 协议的 AI 能力扩展规划

背景与思考

RobustMQ 的定位是多协议统一消息引擎,底层一份存储,上层支持 MQTT、Kafka、AMQP 等多种协议的消费视角。在规划 AI 相关能力时,我们面临一个核心问题:AI Agent 之间的通信需要什么样的协议?

最初的思路是设计一个全新的 AI 专用协议。但这条路意味着新的线上协议规范、新的 SDK、新的客户端生态,全部从零开始。对于一个基础软件项目来说,协议生态的冷启动成本极高,这个方向被否决了。

转机来自对 NATS 协议的深入分析。NATS JetStream 的扩展方式给了我们启发:JetStream 没有引入任何新的线上协议指令,所有操作都通过标准的 PUB/SUB 完成,服务端暴露 $JS.API.* 的 Subject 空间,客户端发 JSON 请求,服务端返回 JSON 响应。JetStream 的全部能力——Stream 管理、Consumer 管理、ACK 机制——都是在 NATS 协议上的应用层扩展。

同样的思路完全可以用在 AI 能力上。 我们可以在 NATS 协议上定义 $mq9.AI.API.* 的 Subject 空间,用标准的 NATS Request-Reply 模式实现 AI Agent 的注册、发现、调用、编排。不需要新协议,不需要新 SDK,任何 NATS 客户端(Go、Rust、Python、Java、JavaScript……)都能直接接入。

为什么是 NATS

在 RobustMQ 支持的所有协议中,NATS 是最适合 AI 场景的,原因如下:

文本协议,AI 原生友好

NATS 是基于 TCP 的纯文本协议,PUBSUBMSG 就是几个简单的文本命令。LLM 可以直接生成和解析这种格式,不需要二进制编解码。AI Agent 甚至可以直接通过 TCP 连接与 RobustMQ 交互,零依赖。

相比之下,Kafka 的二进制协议需要专用客户端库,MQTT 的二进制帧结构需要编解码器,AMQP 的帧协议更是复杂。对于 AI Agent 这种需要快速集成、轻量通信的场景,文本协议的优势是决定性的。

Request-Reply 天然适配 Agent 模式

AI Agent 的核心交互模式是"发请求,等响应"——发送一个推理任务,等待模型返回结果。NATS 的 Request-Reply 正是这个模式:发一条带 reply-to 的消息,服务端(或另一个 Agent)处理后把结果发到 reply-to 地址。

这种模式天然适配:

  • Agent 之间的任务委派和结果回传
  • 工具调用(Tool Use):Agent 发请求调用外部工具,等待工具返回结果
  • Chain of Thought 的中间结果传递
  • 多 Agent 协作的请求-响应链

Subject 层级命名适配能力路由

NATS 的 Subject 采用 . 分隔的层级命名结构,支持 *> 通配符。这天然适合 AI 能力的组织和路由:

ai.agent.translation.en-to-zh    # 翻译 Agent
ai.agent.code.review              # 代码审查 Agent
ai.agent.code.generation          # 代码生成 Agent
ai.tool.search.web                # Web 搜索工具
ai.tool.search.database            # 数据库查询工具

调度器可以用通配符监听:

ai.agent.>        # 监听所有 Agent
ai.tool.*         # 监听所有工具的顶层调用
ai.agent.code.*   # 监听所有代码相关的 Agent

Queue Group 天然做 Agent 负载均衡

多个相同能力的 AI Agent 加入同一个 Queue Group,NATS 自动将请求分配给其中一个 Agent 处理。不需要额外的负载均衡器或调度器。

# 三个翻译 Agent 实例加入同一个 Queue Group
SUB ai.agent.translation.en-to-zh translators 1
SUB ai.agent.translation.en-to-zh translators 2
SUB ai.agent.translation.en-to-zh translators 3

每条翻译请求只会被一个 Agent 处理,天然实现水平扩展。

百万级 TPS 支撑高频 Agent 通信

AI Agent 集群的通信频率可能很高——多 Agent 协作、工具调用链、实时推理请求。NATS 的设计目标就是百万级 TPS、亚毫秒延迟,完全撑得住这种负载。

现成的 SDK 和生态

NATS 官方提供 Go、Rust、Python、Java、JavaScript、C#、C、Ruby、Elixir 等客户端库,社区还有 30+ 语言的实现。AI 开发者用任何语言都能直接接入,零学习成本。

扩展设计:$mq9.AI.API.*

借鉴 JetStream 的 $JS.API.* 模式,我们在 NATS 协议上定义 $mq9.AI.API.* Subject 空间,用于 AI 能力的管理和调用。所有操作都是标准的 NATS Request-Reply,请求和响应体为 JSON。

Agent 管理

# 注册 Agent
PUB $mq9.AI.API.AGENT.REGISTER _INBOX.1 128\r\n
{
  "name": "translator-01",
  "type": "translation",
  "capabilities": ["en-to-zh", "zh-to-en", "en-to-ja"],
  "max_concurrency": 10,
  "model": "gpt-4",
  "metadata": {"version": "1.0", "region": "us-east"}
}\r\n

# 注销 Agent
PUB $mq9.AI.API.AGENT.DEREGISTER.translator-01 _INBOX.2 0\r\n
\r\n

# 查询 Agent 信息
PUB $mq9.AI.API.AGENT.INFO.translator-01 _INBOX.3 0\r\n
\r\n

# 列出所有 Agent
PUB $mq9.AI.API.AGENT.LIST _INBOX.4 2\r\n
{}\r\n

# 按能力查询可用 Agent
PUB $mq9.AI.API.AGENT.DISCOVER _INBOX.5 32\r\n
{"capability": "en-to-zh"}\r\n

Agent 调用

# 同步调用(Request-Reply)
PUB $mq9.AI.API.AGENT.INVOKE.translator-01 _INBOX.6 64\r\n
{
  "task": "en-to-zh",
  "input": "Hello, how are you?",
  "timeout_ms": 5000
}\r\n

→ MSG _INBOX.6 1 96\r\n
{
  "output": "你好,你好吗?",
  "latency_ms": 230,
  "model": "gpt-4",
  "tokens": {"input": 12, "output": 8}
}\r\n

按能力调用(自动路由)

不指定具体 Agent,按能力类型调用,由 Queue Group 自动负载均衡:

# 发送到能力 Subject,任意可用 Agent 处理
PUB ai.agent.translation.en-to-zh _INBOX.7 48\r\n
{"input": "Good morning", "task": "translate"}\r\n

工具注册与调用

# 注册工具
PUB $mq9.AI.API.TOOL.REGISTER _INBOX.8 96\r\n
{
  "name": "web-search",
  "description": "Search the web for information",
  "parameters": {
    "query": {"type": "string", "required": true},
    "max_results": {"type": "int", "default": 10}
  }
}\r\n

# 调用工具
PUB $mq9.AI.API.TOOL.INVOKE.web-search _INBOX.9 48\r\n
{"query": "RobustMQ architecture", "max_results": 5}\r\n

任务编排

# 创建多步骤任务
PUB $mq9.AI.API.PIPELINE.CREATE _INBOX.10 256\r\n
{
  "name": "translate-and-summarize",
  "steps": [
    {"agent": "translator-01", "task": "en-to-zh", "input_from": "user"},
    {"agent": "summarizer-01", "task": "summarize", "input_from": "step.0.output"}
  ]
}\r\n

# 执行任务
PUB $mq9.AI.API.PIPELINE.RUN.translate-and-summarize _INBOX.11 64\r\n
{"input": "A long English article..."}\r\n

Agent 健康检查与监控

# 心跳上报(Agent 定期发送)
PUB $mq9.AI.API.AGENT.HEARTBEAT.translator-01 0\r\n
\r\n

# 查询 Agent 状态
PUB $mq9.AI.API.AGENT.STATUS.translator-01 _INBOX.12 0\r\n
\r\n

→ MSG _INBOX.12 1 128\r\n
{
  "name": "translator-01",
  "status": "healthy",
  "active_tasks": 3,
  "total_processed": 12847,
  "avg_latency_ms": 245,
  "uptime_seconds": 86400
}\r\n

与 RobustMQ 统一存储的结合

$mq9.AI.API.* 的能力不是独立存在的,它与 RobustMQ 的统一存储模型深度结合:

IoT 数据驱动 AI 推理:设备通过 MQTT 上报数据,数据写入 RobustMQ 统一存储。AI Agent 通过 NATS 订阅数据流,实时做推理。推理结果写回存储,业务系统通过 Kafka 兼容协议消费。全程在一个集群内完成。

MQTT 设备 → [统一存储] → NATS AI Agent → [统一存储] → Kafka 业务消费

推理结果持久化:AI Agent 的推理结果可以直接发布到普通 Subject,被 RobustMQ 存储引擎持久化。其他 Agent 或业务系统可以通过 offset 回放这些结果,做审计、分析或二次处理。

Agent 状态管理:Agent 注册信息、健康状态、任务历史可以存储在 RobustMQ 中,利用统一存储的持久化和多视图消费能力做管理。

协议路线图

Phase 1 (当前):MQTT 核心 → 生产可用
Phase 2:Kafka 协议兼容 → 数据管道场景
Phase 3:NATS 协议兼容 → 轻量消息 + AI 基础
Phase 4:$mq9.AI.API.* 扩展 → AI Agent 通信和编排

每一步都建立在上一步的基础上。NATS 协议本身的实现成本相对较低(文本协议、命令简单),AI 扩展是在 NATS 兼容之上的纯业务层逻辑,不涉及协议层改动。

差异化

这套方案的差异化在于:

  1. 不发明新协议:借用 NATS 的协议和生态,AI 开发者用现有的 NATS SDK 就能接入,零切换成本。
  2. 不依赖外部 AI 框架:Agent 注册、发现、调用、负载均衡全在 RobustMQ 内部完成,不需要 LangChain、AutoGen 等外部框架做通信层。
  3. 统一存储打通数据链路:IoT 数据采集(MQTT)→ AI 推理(NATS)→ 结果消费(Kafka),全在一个集群里,数据只写一份。
  4. RobustMQ 独有$mq9.AI.API.* 是我们定义的扩展,其他 NATS 实现没有这个能力。这是协议兼容之上的差异化功能。

声明

本文为 RobustMQ 项目的技术规划和设想,记录我们对 AI + 消息系统结合方向的思考。具体实现方案和 API 设计可能在开发过程中调整。欢迎社区讨论和反馈。

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀