RobustMQ AI:基于 NATS 协议的 AI 能力扩展规划
背景与思考
RobustMQ 的定位是多协议统一消息引擎,底层一份存储,上层支持 MQTT、Kafka、AMQP 等多种协议的消费视角。在规划 AI 相关能力时,我们面临一个核心问题:AI Agent 之间的通信需要什么样的协议?
最初的思路是设计一个全新的 AI 专用协议。但这条路意味着新的线上协议规范、新的 SDK、新的客户端生态,全部从零开始。对于一个基础软件项目来说,协议生态的冷启动成本极高,这个方向被否决了。
转机来自对 NATS 协议的深入分析。NATS JetStream 的扩展方式给了我们启发:JetStream 没有引入任何新的线上协议指令,所有操作都通过标准的 PUB/SUB 完成,服务端暴露 $JS.API.* 的 Subject 空间,客户端发 JSON 请求,服务端返回 JSON 响应。JetStream 的全部能力——Stream 管理、Consumer 管理、ACK 机制——都是在 NATS 协议上的应用层扩展。
同样的思路完全可以用在 AI 能力上。 我们可以在 NATS 协议上定义 $mq9.AI.API.* 的 Subject 空间,用标准的 NATS Request-Reply 模式实现 AI Agent 的注册、发现、调用、编排。不需要新协议,不需要新 SDK,任何 NATS 客户端(Go、Rust、Python、Java、JavaScript……)都能直接接入。
为什么是 NATS
在 RobustMQ 支持的所有协议中,NATS 是最适合 AI 场景的,原因如下:
文本协议,AI 原生友好
NATS 是基于 TCP 的纯文本协议,PUB、SUB、MSG 就是几个简单的文本命令。LLM 可以直接生成和解析这种格式,不需要二进制编解码。AI Agent 甚至可以直接通过 TCP 连接与 RobustMQ 交互,零依赖。
相比之下,Kafka 的二进制协议需要专用客户端库,MQTT 的二进制帧结构需要编解码器,AMQP 的帧协议更是复杂。对于 AI Agent 这种需要快速集成、轻量通信的场景,文本协议的优势是决定性的。
Request-Reply 天然适配 Agent 模式
AI Agent 的核心交互模式是"发请求,等响应"——发送一个推理任务,等待模型返回结果。NATS 的 Request-Reply 正是这个模式:发一条带 reply-to 的消息,服务端(或另一个 Agent)处理后把结果发到 reply-to 地址。
这种模式天然适配:
- Agent 之间的任务委派和结果回传
- 工具调用(Tool Use):Agent 发请求调用外部工具,等待工具返回结果
- Chain of Thought 的中间结果传递
- 多 Agent 协作的请求-响应链
Subject 层级命名适配能力路由
NATS 的 Subject 采用 . 分隔的层级命名结构,支持 * 和 > 通配符。这天然适合 AI 能力的组织和路由:
ai.agent.translation.en-to-zh # 翻译 Agent
ai.agent.code.review # 代码审查 Agent
ai.agent.code.generation # 代码生成 Agent
ai.tool.search.web # Web 搜索工具
ai.tool.search.database # 数据库查询工具调度器可以用通配符监听:
ai.agent.> # 监听所有 Agent
ai.tool.* # 监听所有工具的顶层调用
ai.agent.code.* # 监听所有代码相关的 AgentQueue Group 天然做 Agent 负载均衡
多个相同能力的 AI Agent 加入同一个 Queue Group,NATS 自动将请求分配给其中一个 Agent 处理。不需要额外的负载均衡器或调度器。
# 三个翻译 Agent 实例加入同一个 Queue Group
SUB ai.agent.translation.en-to-zh translators 1
SUB ai.agent.translation.en-to-zh translators 2
SUB ai.agent.translation.en-to-zh translators 3每条翻译请求只会被一个 Agent 处理,天然实现水平扩展。
百万级 TPS 支撑高频 Agent 通信
AI Agent 集群的通信频率可能很高——多 Agent 协作、工具调用链、实时推理请求。NATS 的设计目标就是百万级 TPS、亚毫秒延迟,完全撑得住这种负载。
现成的 SDK 和生态
NATS 官方提供 Go、Rust、Python、Java、JavaScript、C#、C、Ruby、Elixir 等客户端库,社区还有 30+ 语言的实现。AI 开发者用任何语言都能直接接入,零学习成本。
扩展设计:$mq9.AI.API.*
借鉴 JetStream 的 $JS.API.* 模式,我们在 NATS 协议上定义 $mq9.AI.API.* Subject 空间,用于 AI 能力的管理和调用。所有操作都是标准的 NATS Request-Reply,请求和响应体为 JSON。
Agent 管理
# 注册 Agent
PUB $mq9.AI.API.AGENT.REGISTER _INBOX.1 128\r\n
{
"name": "translator-01",
"type": "translation",
"capabilities": ["en-to-zh", "zh-to-en", "en-to-ja"],
"max_concurrency": 10,
"model": "gpt-4",
"metadata": {"version": "1.0", "region": "us-east"}
}\r\n
# 注销 Agent
PUB $mq9.AI.API.AGENT.DEREGISTER.translator-01 _INBOX.2 0\r\n
\r\n
# 查询 Agent 信息
PUB $mq9.AI.API.AGENT.INFO.translator-01 _INBOX.3 0\r\n
\r\n
# 列出所有 Agent
PUB $mq9.AI.API.AGENT.LIST _INBOX.4 2\r\n
{}\r\n
# 按能力查询可用 Agent
PUB $mq9.AI.API.AGENT.DISCOVER _INBOX.5 32\r\n
{"capability": "en-to-zh"}\r\nAgent 调用
# 同步调用(Request-Reply)
PUB $mq9.AI.API.AGENT.INVOKE.translator-01 _INBOX.6 64\r\n
{
"task": "en-to-zh",
"input": "Hello, how are you?",
"timeout_ms": 5000
}\r\n
→ MSG _INBOX.6 1 96\r\n
{
"output": "你好,你好吗?",
"latency_ms": 230,
"model": "gpt-4",
"tokens": {"input": 12, "output": 8}
}\r\n按能力调用(自动路由)
不指定具体 Agent,按能力类型调用,由 Queue Group 自动负载均衡:
# 发送到能力 Subject,任意可用 Agent 处理
PUB ai.agent.translation.en-to-zh _INBOX.7 48\r\n
{"input": "Good morning", "task": "translate"}\r\n工具注册与调用
# 注册工具
PUB $mq9.AI.API.TOOL.REGISTER _INBOX.8 96\r\n
{
"name": "web-search",
"description": "Search the web for information",
"parameters": {
"query": {"type": "string", "required": true},
"max_results": {"type": "int", "default": 10}
}
}\r\n
# 调用工具
PUB $mq9.AI.API.TOOL.INVOKE.web-search _INBOX.9 48\r\n
{"query": "RobustMQ architecture", "max_results": 5}\r\n任务编排
# 创建多步骤任务
PUB $mq9.AI.API.PIPELINE.CREATE _INBOX.10 256\r\n
{
"name": "translate-and-summarize",
"steps": [
{"agent": "translator-01", "task": "en-to-zh", "input_from": "user"},
{"agent": "summarizer-01", "task": "summarize", "input_from": "step.0.output"}
]
}\r\n
# 执行任务
PUB $mq9.AI.API.PIPELINE.RUN.translate-and-summarize _INBOX.11 64\r\n
{"input": "A long English article..."}\r\nAgent 健康检查与监控
# 心跳上报(Agent 定期发送)
PUB $mq9.AI.API.AGENT.HEARTBEAT.translator-01 0\r\n
\r\n
# 查询 Agent 状态
PUB $mq9.AI.API.AGENT.STATUS.translator-01 _INBOX.12 0\r\n
\r\n
→ MSG _INBOX.12 1 128\r\n
{
"name": "translator-01",
"status": "healthy",
"active_tasks": 3,
"total_processed": 12847,
"avg_latency_ms": 245,
"uptime_seconds": 86400
}\r\n与 RobustMQ 统一存储的结合
$mq9.AI.API.* 的能力不是独立存在的,它与 RobustMQ 的统一存储模型深度结合:
IoT 数据驱动 AI 推理:设备通过 MQTT 上报数据,数据写入 RobustMQ 统一存储。AI Agent 通过 NATS 订阅数据流,实时做推理。推理结果写回存储,业务系统通过 Kafka 兼容协议消费。全程在一个集群内完成。
MQTT 设备 → [统一存储] → NATS AI Agent → [统一存储] → Kafka 业务消费推理结果持久化:AI Agent 的推理结果可以直接发布到普通 Subject,被 RobustMQ 存储引擎持久化。其他 Agent 或业务系统可以通过 offset 回放这些结果,做审计、分析或二次处理。
Agent 状态管理:Agent 注册信息、健康状态、任务历史可以存储在 RobustMQ 中,利用统一存储的持久化和多视图消费能力做管理。
协议路线图
Phase 1 (当前):MQTT 核心 → 生产可用
Phase 2:Kafka 协议兼容 → 数据管道场景
Phase 3:NATS 协议兼容 → 轻量消息 + AI 基础
Phase 4:$mq9.AI.API.* 扩展 → AI Agent 通信和编排每一步都建立在上一步的基础上。NATS 协议本身的实现成本相对较低(文本协议、命令简单),AI 扩展是在 NATS 兼容之上的纯业务层逻辑,不涉及协议层改动。
差异化
这套方案的差异化在于:
- 不发明新协议:借用 NATS 的协议和生态,AI 开发者用现有的 NATS SDK 就能接入,零切换成本。
- 不依赖外部 AI 框架:Agent 注册、发现、调用、负载均衡全在 RobustMQ 内部完成,不需要 LangChain、AutoGen 等外部框架做通信层。
- 统一存储打通数据链路:IoT 数据采集(MQTT)→ AI 推理(NATS)→ 结果消费(Kafka),全在一个集群里,数据只写一份。
- RobustMQ 独有:
$mq9.AI.API.*是我们定义的扩展,其他 NATS 实现没有这个能力。这是协议兼容之上的差异化功能。
声明
本文为 RobustMQ 项目的技术规划和设想,记录我们对 AI + 消息系统结合方向的思考。具体实现方案和 API 设计可能在开发过程中调整。欢迎社区讨论和反馈。
