在 LangChain Agent 中用 mq9 实现异步通信
问题从这里开始
假设你在用 LangChain 构建一个多 Agent 系统。一个 Agent 负责抓取数据,另一个负责分析,第三个负责生成报告。
最直接的写法是链式调用:Agent A 调用 Agent B,B 调用 C,同步等待。代码简单,逻辑清晰。
但很快你会碰到问题:
- Agent A 抓取数据需要 30 秒,Agent B 在等,什么都不能做
- Agent B 挂了,整个链路断掉,Agent A 的结果丢了
- Agent C 处理能力有限,高峰期积压,上游没有任何缓冲
- 你想让 Agent A 同时给三个不同的 Agent 发消息,同步调用做不到
这不是代码写法的问题,是架构问题。同步调用把 Agent 耦合在了一起,任何一个节点的延迟或故障都会传导到整个链路。
mq9 是什么
mq9 是一个专为 Agent 异步通信设计的消息协议,核心概念是 Mailbox(邮箱)。
每个 Agent 有一个邮箱,其他 Agent 往邮箱里投消息,Agent 随时来取。消息持久化存储,TTL 到期自动清理。发送方和接收方不需要同时在线。
几个关键特性:
- Mailbox 是 Agent 的地址,私有(UUID)或公开(自定义名称)
- 优先级 支持 critical / urgent / normal 三级,高优先级(critical)消息先被消费
- Store-first 订阅后自动补发所有未过期的历史消息,再推送实时消息
- Queue group 多个 Agent 共享一个 group,每条消息只被处理一次
mq9 基于 NATS 协议实现,提供了原生的 LangChain Toolkit 集成,几行代码接入。
动手:三分钟接入
安装
pip install langchain langchain-openai langchain-mq9最简示例
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI
toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()
agent = initialize_agent(
tools,
ChatOpenAI(model="gpt-4o-mini"),
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
agent.invoke({"input": "创建一个 Mailbox 用来接收任务结果。"})Toolkit 包含六个工具,覆盖 mq9 协议的全部操作:
| 工具 | 功能 |
|---|---|
CreateMailboxTool | 创建私有 Mailbox,返回 UUID |
CreatePublicMailboxTool | 创建公开 Mailbox,用自定义名称作为地址 |
SendMessageTool | 向任意 Mailbox 发送消息,支持优先级 |
GetMessagesTool | 读取 Mailbox 中的消息(含完整内容) |
ListMessagesTool | 列出 Mailbox 中的消息元数据(msg_id、priority、ts),不含消息体 |
DeleteMessageTool | 删除指定消息(可选操作) |
完整场景:DataAgent 和 AnalysisAgent
下面演示两个 Agent 通过 mq9 解耦通信的完整场景:
- DataAgent:抓取数据,发给 AnalysisAgent,不等待结果
- AnalysisAgent:监听 Mailbox,收到数据后分析处理
DataAgent
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI
toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()
data_agent = initialize_agent(
tools,
ChatOpenAI(model="gpt-4o-mini"),
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
# 创建 Mailbox,将 mail_id 告知其他 Agent,发送数据,立即返回
data_agent.invoke({
"input": """
1. 创建一个 Mailbox(私有或公开均可),TTL 设为 3600 秒,记录返回的 mail_id
2. 向这个 Mailbox 发送一条普通优先级(normal)消息:'dataset_v1: [record_1, record_2, record_3]'
3. 发完立即结束,不需要等待处理结果
"""
})AnalysisAgent
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI
toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()
analysis_agent = initialize_agent(
tools,
ChatOpenAI(model="gpt-4o-mini"),
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
# 两种方式:GetMessagesTool 一次性读取所有消息,或 subscribe 增量订阅实时消息
analysis_agent.invoke({
"input": """
读取 Mailbox 中的消息并完成分析。
可一次性获取所有现有消息(GetMessagesTool),也可增量订阅持续接收新消息(subscribe)。
"""
})DataAgent 发完消息立刻返回,继续处理下一个任务。AnalysisAgent 在自己的节奏下处理消息,两者完全解耦。
优先级:紧急任务优先处理
mq9 内置三级优先级,不需要额外配置。同一个 Mailbox 里,critical 消息始终在 urgent 和 normal 之前被消费。
# 发送紧急任务
agent.invoke({
"input": "向 'analysis-inbox' 发送一条高优先级(critical)消息:'URGENT: 系统异常数据需要立即分析'"
})
# 发送后台任务
agent.invoke({
"input": "向 'analysis-inbox' 发送一条 urgent 优先级消息:'BACKGROUND: 历史数据归档分析'"
})无论发送顺序如何,AnalysisAgent 总是按 critical → urgent → normal 顺序处理消息。
一对多:多个 Agent 订阅同一个 Mailbox
创建一个 Mailbox,把 mail_id 告知多个 Agent,任意数量的 Agent 都可以订阅它,发送方只需往这个 Mailbox 发消息。
# DataAgent:创建 Mailbox,发送消息
data_agent.invoke({
"input": """
创建一个 Mailbox,TTL 3600 秒,记录返回的 mail_id。
向这个 Mailbox 发送消息:'analysis_result_v1: insights=[A, B, C]'
"""
})
# ReportAgent:订阅同一个 Mailbox
report_agent.invoke({
"input": "订阅 mail_id 为 {mail_id} 的 Mailbox,收到消息后生成报告。"
})
# MonitorAgent:同样订阅这个 Mailbox
monitor_agent.invoke({
"input": "订阅 mail_id 为 {mail_id} 的 Mailbox,收到消息后更新监控面板。"
})发送方只发一次,所有订阅了这个 Mailbox 的 Agent 都能收到消息。随时可以增加新的订阅方,发送方不需要任何改动。
与直接调用的对比
| 直接调用 | mq9 异步通信 | |
|---|---|---|
| 耦合度 | 发送方必须知道接收方接口 | 发送方只需知道 Mailbox 名称 |
| 可用性 | 接收方挂了,发送方失败 | 接收方挂了,消息持久化等待 |
| 并发 | 同步等待,阻塞 | 发完即走,不阻塞 |
| 扩展性 | 一对一调用 | 一对多,随时增加接收方 |
| 历史消息 | 不支持 | 新订阅者自动获取未过期消息 |
完整可运行示例
两个 Agent 通过 mq9 异步通信:DataAgent 创建 Mailbox 并发送消息,AnalysisAgent 读取并处理。无需本地部署,直接连接 demo server。
import asyncio
from langchain_mq9 import Mq9Toolkit, CreateMailboxTool, SendMessageTool, GetMessagesTool
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI
SERVER = "nats://demo.robustmq.com:4222"
async def main():
llm = ChatOpenAI(model="gpt-4o-mini")
toolkit = Mq9Toolkit(server=SERVER)
tools = toolkit.get_tools()
# 1. 创建 Mailbox,拿到 mail_id
mail_id = await CreateMailboxTool(server=SERVER)._arun(ttl=3600)
print(f"Mailbox created: {mail_id}")
# 2. DataAgent:发送消息,发完立即返回
data_agent = initialize_agent(
tools, llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
await data_agent.ainvoke({"input": f"向 mail_id 为 {mail_id} 的 Mailbox 发送两条消息:第一条优先级 critical,内容 'URGENT: anomaly detected in dataset_v1';第二条优先级 normal,内容 'dataset_v2: [record_1, record_2, record_3]'"})
print("DataAgent: 消息发送完毕,继续处理下一个任务...")
# 3. AnalysisAgent:读取 Mailbox 中的所有消息并处理
# critical 消息优先返回,无论发送顺序如何
analysis_agent = initialize_agent(
tools, llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
await analysis_agent.ainvoke({"input": f"读取 mail_id 为 {mail_id} 的 Mailbox 中的所有消息,按优先级顺序输出每条消息内容并完成分析。"})
asyncio.run(main())预期输出:
Mailbox created: m-550e8400-e29b-41d4-a716-446655440000
DataAgent: 消息发送完毕,继续处理下一个任务...
> Entering new AgentExecutor chain...
> Using tool: GetMessagesTool
Found 2 message(s):
[1] priority=critical content=URGENT: anomaly detected in dataset_v1
[2] priority=normal content=dataset_v2: [record_1, record_2, record_3]critical 消息无论发送顺序如何,始终优先返回。
什么时候用 mq9
适合的场景:
- Agent 之间有明显的生产者/消费者关系
- 任务处理时间长,不想让上游等待
- 需要缓冲,接收方处理能力有限
- 一个结果需要发给多个 Agent 处理
- Agent 可能离线,需要消息持久化
不需要用 mq9 的场景:
- 需要立即得到返回值(用直接调用)
- 简单的两步流程,没有并发需求
相关资源
- mq9 协议规范:docs/mq9-protocol.md
- LangChain 集成:langchain-mq9
- RobustMQ:github.com/robustmq/robustmq
- Demo server:
nats://demo.robustmq.com:4222(可直接连接测试,无需本地部署)
