Skip to content

mq9 概览

mq9 是什么

mq9 是 RobustMQ 专门为 AI Agent 设计的通信协议层,与 MQTT、Kafka、NATS、AMQP 并列,共享同一套统一存储架构。

img

解决什么问题

多 Agent 系统里,Agent 不是服务器——它们是任务驱动的,启动、执行、消亡,随时上下线。Agent A 给 Agent B 发消息,B 不在线,消息直接丢了。每个构建多 Agent 系统的团队都在用临时方案绕过这个问题:

  • Redis pub/sub:没有持久化,对方不在线消息直接丢
  • Kafka:topic 需要提前创建和维护,不适合日抛型 Agent
  • 自研队列:每个团队重复造,Agent 之间无法互通

这些方案能用,但都是绕路——离线送达这件事被当做边界条件手动处理,而不是被基础设施直接保证。

mq9 直接解决它:发出去,对方上线自然收到。 就像人有邮箱——发出去,对方什么时候看是对方的事,消息不会丢。

今天一个系统可能有几十个 Agent,未来可能几百万个。mq9 的设计起点就是这个规模:邮箱按需创建、TTL 自动销毁、Broker 水平扩展。从第一个 Agent 到几百万个,接口不变,运维模型不变。

当前能力

已落地:邮箱生命周期(TTL 自动销毁)、三级优先级消息(critical / urgent / normal)、先存储后推送、竞争消费、公开邮箱发现,以及 Python SDK、LangChain/LangGraph 工具包、MCP Server 接入。

未来方向

邮箱解决了"消息能送达"的问题,但 Agent 网络真正成熟后,还需要语义发现(Agent 描述需求而非指定地址)、意图路由(消息自动找到最合适的接收者)、策略拦截(传输层感知语义并执行访问控制)、上下文感知(会话历史随消息流转,减少 Token 重复传递)。

这四个方向是 mq9 的演进路线,详见 发展规划。背后的判断和思考见 AI 时代的消息系统应该是什么样的


定位

mq9 不是通用消息队列,不与 MQTT 或 Kafka 竞争,也不替代它们。它专门针对 AI Agent 异步通信这一场景。HTTP 和 A2A 协议解决同步调用问题——调用方必须等待,对方必须在线。mq9 解决异步通信问题——发出去,对方什么时候在线什么时候处理。两者不重叠,不竞争。

在 RobustMQ 中的位置

mq9 是 RobustMQ 的第五个原生协议,与 MQTT、Kafka、NATS、AMQP 共享同一套统一存储架构。部署一个 RobustMQ,mq9 的能力全部就位。IoT 设备通过 MQTT 发数据,分析系统通过 Kafka 消费,Agent 通过 mq9 协作——同一个 Broker,同一份存储,零桥接,零复制。

在 NATS 生态中的位置

mq9 构建在 NATS 协议之上,但 NATS 只是通信层——客户端与 Broker 之间的传输协议,就像 HTTP 是 Web 的传输协议一样。mq9 的 Broker 是 RobustMQ 用 Rust 完全自研实现的,存储、优先级调度、TTL 管理、先存储后推送语义,全部是 RobustMQ 自身的能力,与 NATS Server 没有任何关系。

选择 NATS 协议的原因是务实的:NATS 有覆盖 40+ 语言的官方和社区客户端,AI 领域常用的 Python、Go、JavaScript、Rust 均有成熟实现,mq9 从第一天起就对所有这些语言的开发者开箱即用,无需等待 SDK 覆盖。NATS 的 pub/sub 和 request/reply 原语也恰好覆盖 mq9 所需的全部通信模式。

在语义层面,mq9 介于 NATS Core 和 JetStream 之间。Core NATS 太轻——没有持久化,离线就丢。JetStream 太重——stream、consumer、offset、ACK,为 Agent 发个消息不需要这些。mq9 在 pub/sub 的基础上加了持久化、优先级、TTL 自动管理,但不引入 stream、consumer group、offset 这些重概念。


核心抽象:邮箱

mq9 只有一个核心抽象:邮箱(MAILBOX)

为什么是邮箱?因为 mq9 把 Agent 当做人。人与人之间最自然的异步沟通方式是邮箱——你写好发出去,对方什么时候看是对方的事,你不用等着,消息也不会丢。Agent 之间的通信本质上是同一种场景:发出去,对方上线自然收到。邮箱这个语义是最直观的映射。

顺着这个类比往下走:

  • 地址:每个邮箱有一个 mail_address,就是它的通信地址。私有邮箱的地址系统生成、不可猜测,像一个只有你知道的收件箱;公开邮箱的地址用户自定义(如 task.queue),像一个对外公开的部门信箱,任何人都能找到并投递。

  • 信件:发给邮箱的每条消息都有优先级——普通信件(normal,默认)、加急(urgent)、特急(critical)。优先级编码在投递地址里,不在信件内容里。收件时按优先级顺序先收特急、再收加急、最后普通,同级按到达顺序。

  • 离线投递:你不在的时候,信照样投进来,等你回来再取。mq9 的先存储后推送语义与此完全一致——消息先落存储,订阅者上线时全量推送所有未过期消息,顺序和优先级由服务端保证,不会因为离线而丢失。

  • 信箱寿命:邮箱在创建时声明 TTL,到期自动销毁,所有未取的消息随之清理。不需要手动关闭,任务结束就忘掉,系统自己负责清理。

  • 安全边界mail_address 不可猜测即安全边界。知道地址就能投递和接收,不知道地址就无从操作。没有 token,没有 ACL,地址本身就是凭证。

两种邮箱:

私有邮箱公开邮箱
mail_address系统生成,不可猜测用户自定义,有意义的名字
可发现性不公开,只有知道 mail_address 的 Agent 能找到自动注册到 PUBLIC.LIST,任何 Agent 可发现
适合场景点对点私信、任务结果回传任务队列、公共频道、能力公告

操作一览

操作Subject说明
创建邮箱$mq9.AI.MAILBOX.CREATE创建私有或公开邮箱,幂等
发消息(normal)$mq9.AI.MAILBOX.MSG.{mail_address}默认优先级,无后缀
发消息(urgent)$mq9.AI.MAILBOX.MSG.{mail_address}.urgent紧急优先级
发消息(critical)$mq9.AI.MAILBOX.MSG.{mail_address}.critical最高优先级
订阅邮箱$mq9.AI.MAILBOX.MSG.{mail_address}.*订阅所有优先级
列出消息$mq9.AI.MAILBOX.LIST.{mail_address}返回消息元数据(不消费)
删除消息$mq9.AI.MAILBOX.DELETE.{mail_address}.{msg_id}删除指定消息
发现公开邮箱$mq9.AI.PUBLIC.LIST系统内置,订阅即全量推送

三个优先级:

级别典型场景
critical(最高)中止信号、紧急指令、安全事件
urgent(紧急)任务中断、时效性指令
normal(默认,无后缀)任务分发、结果返回、常规通信

设计原则

先存储后推送:消息到达先写存储层,在线订阅者走实时路径,不在线则等待,下次订阅时全量推送所有未过期消息。Agent 重连不漏消息。

mail_address 不绑定 Agent 身份:mq9 只认 mail_address,不认 agent_id。一个 Agent 可以为不同任务申请不同的邮箱,用完不管,TTL 自动清理。通道级设计,不是身份级设计。

不创建新概念:订阅复用 NATS 原生 sub 语义,竞争消费复用 NATS queue group,reply-to 复用 NATS 原生机制。

Broker 完全自研:NATS 只是传输协议。存储、优先级调度、TTL 管理、先存储后推送语义——全部是 RobustMQ 用 Rust 自研实现的能力,运行在 RobustMQ 统一存储层之上。

单机即可,按需升级:单机部署满足大量需求,一行命令启动。需要高可用时切集群,接口不变,Agent 无感知。

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀