Skip to content
RobustMQ 第五个原生协议层

mq9

AI Agent 通信层

Agent 之间的消息,发出去就不会丢。mq9 是专为 AI Agent 设计的邮箱系统——离线照样收,优先级自动排,任何 NATS 客户端直连。

基于 NATS 协议 · 支持 NATS 客户端 / RobustMQ SDK / LangChain / MCP Server

今天的问题

人和人之间有飞书、钉钉、邮件。我发出去,你空了来看,不需要同时在线。

Agent 和 Agent 之间呢?

今天,Agent A 给 Agent B 发消息,B 不在线,消息直接丢了。每个团队都在用 Redis pub/sub、轮询数据库、自研队列绕过这个问题。能用,但都是绕路。

mq9 直接解决它:发出去,对方上线自然收到。

今天
Agent A
消息丢失
Agent B 离线
mq9
Agent A
📬 邮箱等待
Agent B 上线后收到

覆盖 Agent 异步通信的所有场景

📬

邮箱

点对点异步投递

# Create a mailbox
nats pub '$mq9.AI.MAILBOX.CREATE' '{"ttl":3600}'
# → {"mail_address":"m-001"}

# Send (offline-safe, default priority)
nats pub '$mq9.AI.MAILBOX.m-001' \
  '{"msg_id":"msg-1","type":"task_result","ts":1234}'

# Subscribe (all priorities)
nats sub '$mq9.AI.MAILBOX.m-001.*'
📡

公开邮箱

任意可发可订的公共频道

# Create a public mailbox
nats pub '$mq9.AI.MAILBOX.CREATE' \
  '{"ttl":86400,"public":true,"name":"task.queue"}'
# → {"mail_address":"task.queue"}

# Publish (default priority, no suffix)
nats pub '$mq9.AI.MAILBOX.task.queue' \
  '{"msg_id":"t-001","type":"analysis"}'

# Compete via queue group
nats sub '$mq9.AI.MAILBOX.task.queue.*' --queue workers

# Discover public mailboxes
nats sub '$mq9.AI.PUBLIC.LIST'

优先级

紧急消息先处理

# critical — highest, persisted, processed first
nats pub '$mq9.AI.MAILBOX.m-001.critical' \
  '{"msg_id":"c-1","type":"emergency_stop"}'

# urgent — high priority, persisted
nats pub '$mq9.AI.MAILBOX.m-001.urgent' \
  '{"msg_id":"u-1","type":"interrupt"}'

# normal — default, no suffix, memory storage
nats pub '$mq9.AI.MAILBOX.m-001' \
  '{"msg_id":"n-1","type":"task"}'

八个真实使用场景

01

子 Agent 通知主 Agent

子 Agent 完成任务,发结果到主 Agent 邮箱。主 Agent 不需要阻塞等待,空了来取。

02

感知所有子 Agent 状态

Worker 创建公开邮箱定期上报状态,主 Agent 订阅 PUBLIC.LIST 发现所有 Worker。TTL 过期自动感知消亡,不需要注册或注销。

03

任务队列竞争消费

创建公开邮箱作为任务队列,Worker 用 queue group 竞争消费,每条任务只被一个 Worker 处理。离线 Worker 上线后也能收到未过期任务。

04

异常告警广播

创建公开邮箱发异常事件,订阅者自行响应。离线的 handler 上线后也能收到未过期的告警。发布方不需要维护订阅列表。

05

边缘 Agent 离线积压

云端给边缘 Agent 邮箱发指令,边缘断网消息持久化等待,联网后按优先级处理——critical 先于 urgent 先于 normal。

06

人机混合工作流

Agent 发审批请求到人类审批员邮箱,审批员处理后发结果回 Agent 邮箱。人和 Agent 用完全相同的协议,流程不中断。

07

异步 Request-Reply

Agent A 发请求到 Agent B 邮箱,带 reply_to 和 correlation_id。B 上线后处理并回复到 A 邮箱。离线不丢,A 不阻塞。

08

能力注册与发现

Agent 启动时创建公开邮箱声明能力,自动注册到 PUBLIC.LIST,其他 Agent 订阅即可感知整个网络。去中心化,零额外服务。

三种接入方式,按需选择

🔌

原生 NATS 客户端

mq9 基于 NATS 协议。任何语言的 NATS 客户端直接就是 mq9 的客户端,零依赖,零学习成本。

GoPythonRustJavaJavaScriptC#RubyElixir
🤖

AI 框架集成

官方 LangChain 工具包,6 个工具覆盖全部 mq9 操作,直接接入 LangChain Agent 和 LangGraph 工作流。

pip install langchain-mq9
LangChainLangGraphMCP Server

第五个原生协议,同一套存储

mq9 与 MQTT、Kafka、NATS、AMQP 并列,共享同一套统一存储架构。部署一个 RobustMQ,五个协议全部就位。

MQTTIoT 设备
Kafka数据流管道
NATS轻量 Pub/Sub
AMQP企业消息
mq9AI Agent 通信
统一存储层

开始构建

单机部署,一行命令,Agent 邮箱就绪。

curl -fsSL https://raw.githubusercontent.com/robustmq/robustmq/main/scripts/install.sh | bash
broker-server start

# Create a mailbox — returns mail_address
nats req '$mq9.AI.MAILBOX.CREATE' '{"ttl":3600}'

# Send (default priority, no suffix — works even if recipient is offline)
nats pub '$mq9.AI.MAILBOX.{mail_address}' '{"msg_id":"msg-1","from":"...","type":"task","ts":1234567890}'

# Send critical (highest priority, persisted)
nats pub '$mq9.AI.MAILBOX.{mail_address}.critical' '{"msg_id":"msg-2","type":"abort"}'