Skip to content

在 LangChain Agent 中用 mq9 实现异步通信

问题从这里开始

假设你在用 LangChain 构建一个多 Agent 系统。一个 Agent 负责抓取数据,另一个负责分析,第三个负责生成报告。

最直接的写法是链式调用:Agent A 调用 Agent B,B 调用 C,同步等待。代码简单,逻辑清晰。

但很快你会碰到问题:

  • Agent A 抓取数据需要 30 秒,Agent B 在等,什么都不能做
  • Agent B 挂了,整个链路断掉,Agent A 的结果丢了
  • Agent C 处理能力有限,高峰期积压,上游没有任何缓冲
  • 你想让 Agent A 同时给三个不同的 Agent 发消息,同步调用做不到

这不是代码写法的问题,是架构问题。同步调用把 Agent 耦合在了一起,任何一个节点的延迟或故障都会传导到整个链路。


mq9 是什么

mq9 是一个专为 Agent 异步通信设计的消息协议,核心概念是 Mailbox(邮箱)

每个 Agent 有一个邮箱,其他 Agent 往邮箱里投消息,Agent 随时来取。消息持久化存储,TTL 到期自动清理。发送方和接收方不需要同时在线。

几个关键特性:

  • Mailbox 是 Agent 的地址,私有(UUID)或公开(自定义名称)
  • 优先级 支持 critical / urgent / normal 三级,高优先级(critical)消息先被消费
  • Store-first 订阅后自动补发所有未过期的历史消息,再推送实时消息
  • Queue group 多个 Agent 共享一个 group,每条消息只被处理一次

mq9 基于 NATS 协议实现,提供了原生的 LangChain Toolkit 集成,几行代码接入。


动手:三分钟接入

安装

bash
pip install langchain langchain-openai langchain-mq9

最简示例

python
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI

toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()

agent = initialize_agent(
    tools,
    ChatOpenAI(model="gpt-4o-mini"),
    agent=AgentType.OPENAI_FUNCTIONS,
    verbose=True,
)

agent.invoke({"input": "创建一个 Mailbox 用来接收任务结果。"})

Toolkit 包含六个工具,覆盖 mq9 协议的全部操作:

工具功能
CreateMailboxTool创建私有 Mailbox,返回 UUID
CreatePublicMailboxTool创建公开 Mailbox,用自定义名称作为地址
SendMessageTool向任意 Mailbox 发送消息,支持优先级
GetMessagesTool读取 Mailbox 中的消息(含完整内容)
ListMessagesTool列出 Mailbox 中的消息元数据(msg_id、priority、ts),不含消息体
DeleteMessageTool删除指定消息(可选操作)

完整场景:DataAgent 和 AnalysisAgent

下面演示两个 Agent 通过 mq9 解耦通信的完整场景:

  • DataAgent:抓取数据,发给 AnalysisAgent,不等待结果
  • AnalysisAgent:监听 Mailbox,收到数据后分析处理

DataAgent

python
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI

toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()

data_agent = initialize_agent(
    tools,
    ChatOpenAI(model="gpt-4o-mini"),
    agent=AgentType.OPENAI_FUNCTIONS,
    verbose=True,
)

# 创建 Mailbox,将 mail_id 告知其他 Agent,发送数据,立即返回
data_agent.invoke({
    "input": """
    1. 创建一个 Mailbox(私有或公开均可),TTL 设为 3600 秒,记录返回的 mail_id
    2. 向这个 Mailbox 发送一条普通优先级(normal)消息:'dataset_v1: [record_1, record_2, record_3]'
    3. 发完立即结束,不需要等待处理结果
    """
})

AnalysisAgent

python
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI

toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()

analysis_agent = initialize_agent(
    tools,
    ChatOpenAI(model="gpt-4o-mini"),
    agent=AgentType.OPENAI_FUNCTIONS,
    verbose=True,
)

# 两种方式:GetMessagesTool 一次性读取所有消息,或 subscribe 增量订阅实时消息
analysis_agent.invoke({
    "input": """
    读取 Mailbox 中的消息并完成分析。
    可一次性获取所有现有消息(GetMessagesTool),也可增量订阅持续接收新消息(subscribe)。
    """
})

DataAgent 发完消息立刻返回,继续处理下一个任务。AnalysisAgent 在自己的节奏下处理消息,两者完全解耦。


优先级:紧急任务优先处理

mq9 内置三级优先级,不需要额外配置。同一个 Mailbox 里,critical 消息始终在 urgent 和 normal 之前被消费。

python
# 发送紧急任务
agent.invoke({
    "input": "向 'analysis-inbox' 发送一条高优先级(critical)消息:'URGENT: 系统异常数据需要立即分析'"
})

# 发送后台任务
agent.invoke({
    "input": "向 'analysis-inbox' 发送一条 urgent 优先级消息:'BACKGROUND: 历史数据归档分析'"
})

无论发送顺序如何,AnalysisAgent 总是按 critical → urgent → normal 顺序处理消息。


一对多:多个 Agent 订阅同一个 Mailbox

创建一个 Mailbox,把 mail_id 告知多个 Agent,任意数量的 Agent 都可以订阅它,发送方只需往这个 Mailbox 发消息。

python
# DataAgent:创建 Mailbox,发送消息
data_agent.invoke({
    "input": """
    创建一个 Mailbox,TTL 3600 秒,记录返回的 mail_id。
    向这个 Mailbox 发送消息:'analysis_result_v1: insights=[A, B, C]'
    """
})

# ReportAgent:订阅同一个 Mailbox
report_agent.invoke({
    "input": "订阅 mail_id 为 {mail_id} 的 Mailbox,收到消息后生成报告。"
})

# MonitorAgent:同样订阅这个 Mailbox
monitor_agent.invoke({
    "input": "订阅 mail_id 为 {mail_id} 的 Mailbox,收到消息后更新监控面板。"
})

发送方只发一次,所有订阅了这个 Mailbox 的 Agent 都能收到消息。随时可以增加新的订阅方,发送方不需要任何改动。


与直接调用的对比

直接调用mq9 异步通信
耦合度发送方必须知道接收方接口发送方只需知道 Mailbox 名称
可用性接收方挂了,发送方失败接收方挂了,消息持久化等待
并发同步等待,阻塞发完即走,不阻塞
扩展性一对一调用一对多,随时增加接收方
历史消息不支持新订阅者自动获取未过期消息

完整可运行示例

两个 Agent 通过 mq9 异步通信:DataAgent 创建 Mailbox 并发送消息,AnalysisAgent 读取并处理。无需本地部署,直接连接 demo server。

python
import asyncio
from langchain_mq9 import Mq9Toolkit, CreateMailboxTool, SendMessageTool, GetMessagesTool
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI

SERVER = "nats://demo.robustmq.com:4222"

async def main():
    llm = ChatOpenAI(model="gpt-4o-mini")
    toolkit = Mq9Toolkit(server=SERVER)
    tools = toolkit.get_tools()

    # 1. 创建 Mailbox,拿到 mail_id
    mail_id = await CreateMailboxTool(server=SERVER)._arun(ttl=3600)
    print(f"Mailbox created: {mail_id}")

    # 2. DataAgent:发送消息,发完立即返回
    data_agent = initialize_agent(
        tools, llm,
        agent=AgentType.OPENAI_FUNCTIONS,
        verbose=True,
    )
    await data_agent.ainvoke({"input": f"向 mail_id 为 {mail_id} 的 Mailbox 发送两条消息:第一条优先级 critical,内容 'URGENT: anomaly detected in dataset_v1';第二条优先级 normal,内容 'dataset_v2: [record_1, record_2, record_3]'"})
    print("DataAgent: 消息发送完毕,继续处理下一个任务...")

    # 3. AnalysisAgent:读取 Mailbox 中的所有消息并处理
    # critical 消息优先返回,无论发送顺序如何
    analysis_agent = initialize_agent(
        tools, llm,
        agent=AgentType.OPENAI_FUNCTIONS,
        verbose=True,
    )
    await analysis_agent.ainvoke({"input": f"读取 mail_id 为 {mail_id} 的 Mailbox 中的所有消息,按优先级顺序输出每条消息内容并完成分析。"})

asyncio.run(main())

预期输出:

Mailbox created: m-550e8400-e29b-41d4-a716-446655440000
DataAgent: 消息发送完毕,继续处理下一个任务...

> Entering new AgentExecutor chain...
> Using tool: GetMessagesTool
Found 2 message(s):
  [1] priority=critical  content=URGENT: anomaly detected in dataset_v1
  [2] priority=normal    content=dataset_v2: [record_1, record_2, record_3]

critical 消息无论发送顺序如何,始终优先返回。


什么时候用 mq9

适合的场景:

  • Agent 之间有明显的生产者/消费者关系
  • 任务处理时间长,不想让上游等待
  • 需要缓冲,接收方处理能力有限
  • 一个结果需要发给多个 Agent 处理
  • Agent 可能离线,需要消息持久化

不需要用 mq9 的场景:

  • 需要立即得到返回值(用直接调用)
  • 简单的两步流程,没有并发需求

相关资源

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀