mq9 协议的MCP Server和 八个典型场景
本文档展示 LLM 通过 MCP 工具用 mq9 完成八种 Agent 通信场景的完整流程。
每个场景分两个阶段:
- 阶段一:相关 Agent 上线,创建 inbox、注册到注册中心、开始监听
- 阶段二:业务交互流程
MCP 配置
整个公司部署一个 mq9 broker,所有需要用 mq9 的 AI 工具都连同一个 MCP 端点。配置就一份:
{
"mcpServers": {
"mq9": {
"url": "http://mq9-broker.company.internal:8080/mcp"
}
}
}Agent 身份通过 mailbox 名字区分,不在 MCP 配置层。每个 Agent 调用 MCP 工具时通过 mailbox 参数告诉 broker 要操作哪个邮箱。
后面所有场景默认基于这份配置。
几个协议层面的概念
阅读后续场景前,需要先理解几个 mq9 协议的核心概念:
两种消费模式:
- 有状态消费:fetch 时传
group_name,broker 记录消费位点。同 group 多消费者负载均衡(每条消息只给一个)。适合任务竞争场景。 - 无状态消费:fetch 时不传
group_name,broker 不记录位点,每次按deliver策略独立消费。适合广播订阅、查看、调试场景。
消息级 TTL(mq9-ttl header):单条消息的存活时间,独立于 mailbox 的 TTL。常用于"状态上报"场景——mailbox 永不过期,但每条状态消息 N 秒后自动消失,没新消息进来就视为下线。
消息标签(mq9-tags header):逗号分隔的标签字符串(如 billing,vip),可通过 QUERY 的 tags 字段过滤。
消息 key(mq9-key header):同 key 消息存储层只保留最新一条。用于状态压实场景。
消息优先级(mq9-priority header):normal(默认)/ urgent / critical。fetch 时按优先级排序返回。
后续场景会用到这些概念。
场景一:子 Agent 完成任务,异步通知主 Agent
场景描述
主 Agent 把任务委派给执行 Agent,执行 Agent 处理完后异步把结果回传。两个 Agent 任意一方可离线、重启,消息不丢。
阶段一:Agent 上线
执行 Agent(数据分析师):
1. 创建邮箱
[create_mailbox(name="agent.analyzer.001.inbox")]
2. 注册自己的能力到注册中心
[register_agent(agent_card={
"name": "Data Analyzer",
"mailbox": "mq9://broker/agent.analyzer.001.inbox",
"skills": [{
"id": "anomaly-detection",
"tags": ["data.analysis", "anomaly"],
"examples": ["找出销售数据异常", "检测时序突变点"]
}]
})]
3. 监听任务(有状态消费,broker 记录位点支持续拉)
[fetch_messages(mail_address="agent.analyzer.001.inbox", group_name="analyzer-001")]
inbox 当前为空。从这里开始我可以离线,任务进来时 broker 会持久化保存。主 Agent(协调者):
1. 创建邮箱
[create_mailbox(name="agent.coordinator.001.inbox")]
2. 注册自己
[register_agent(agent_card={
"name": "Task Coordinator",
"mailbox": "mq9://broker/agent.coordinator.001.inbox",
"skills": [{"id": "task-coordination", "tags": ["coordination"]}]
})]
注册成功。阶段二:派发任务和接收结果
主 Agent 派发任务:
用户:销售数据需要做异常检测分析。
Claude:
1. 在注册中心找能做这件事的 Agent
[discover_agents(query="数据异常检测")]
找到 agent.analyzer.001.inbox
2. 创建回调 mailbox 接收结果
[create_mailbox(name="task.t001.callback", ttl=3600)]
(也可以复用主 Agent 自己的 inbox 接收,看业务需要)
3. 派发任务给执行 Agent,告诉他结果发到回调 mailbox
[send_message(
mail_address="agent.analyzer.001.inbox",
payload='{"task_id":"t-001","type":"anomaly_detection","data":"...","callback":"task.t001.callback"}'
)]
任务已派发。等结果回来,期间我可以离线。执行 Agent 处理任务:
1. 拉取新任务
[fetch_messages(mail_address="agent.analyzer.001.inbox", group_name="analyzer-001")]
收到 1 个任务:t-001 anomaly_detection
2. 处理任务
[执行分析逻辑...]
3. 把结果发到回调 mailbox
[send_message(
mail_address="task.t001.callback",
payload='{"task_id":"t-001","status":"completed","result":{...}}'
)]
4. 确认 inbox 消息处理完毕
[ack_message(mail_address="agent.analyzer.001.inbox", msg_id=0, group_name="analyzer-001")]主 Agent 接收结果:
1. 检查回调 mailbox(无状态消费,不需要 group_name,每次拉全量)
[fetch_messages(mail_address="task.t001.callback")]
收到结果:t-001 completed,发现 3 个异常点回调 mailbox 通常是一次性的,主 Agent 拿到结果后这个 mailbox 就没用了,等 TTL 自动清理。无状态消费在这种场景下省掉 group_name 和 ack 的负担。
用到的能力
create_mailbox register_agent discover_agents send_message fetch_messages(有状态/无状态两种用法)ack_message
场景二:主 Agent 感知所有子 Agent 状态
场景描述
主 Agent 关注若干 Worker 的运行状态。每个 Worker 把自己的状态发到一个固定 mailbox,用消息级 TTL 让状态自动过期。主 Agent 用 query_mailbox 一眼看完,没拿到就说明 Worker 下线了。
阶段一:Agent 上线
每个 Worker 创建自己的状态 mailbox,mailbox 本身永不过期:
1. Worker-001 创建状态 mailbox(永不过期)
[create_mailbox(name="status.worker-001")]
2. Worker-002 创建状态 mailbox(永不过期)
[create_mailbox(name="status.worker-002")]主 Agent 不需要订阅,直接 query 即可。
阶段二:状态上报和查询
Worker 一侧(每 30 秒上报一次):
1. 上报当前状态,用 mq9-key 让 mailbox 只保留最新一条
用 mq9-ttl 让这条状态消息 60 秒后自动过期
[send_message(
mail_address="status.worker-001",
payload='{"status":"running","load":0.3,"capabilities":["data.analysis"]}',
headers={"mq9-key": "status", "mq9-ttl": "60"}
)]mq9-key="status" 让 mailbox 里只保留最新一条,下次上报旧状态被覆盖。 mq9-ttl=60 让这条消息 60 秒后从 mailbox 消失。
Worker 持续上报(每 30 秒一次)相当于持续刷新最新状态——上一条消息没过期就被新消息覆盖。如果 Worker 挂了停止上报,最后一条状态消息 60 秒后自动消失,mailbox 变空。
主 Agent 一侧(按需查询):
用户:看一眼 worker-001 和 worker-002 当前状态
Claude:
1. 查 worker-001
[query_mailbox(mail_address="status.worker-001", key="status")]
worker-001: running, load 0.3
2. 查 worker-002
[query_mailbox(mail_address="status.worker-002", key="status")]
worker-002: 没有消息(推测已下线,最后一次状态上报 60 秒前过期)query_mailbox 不消费消息也不推进位点,纯查看。
用到的能力
create_mailbox send_message(带 mq9-key + mq9-ttl 两个 header)query_mailbox
场景三:任务广播,多个 Worker 竞争消费
场景描述
主 Agent 投递一批任务到一个公共 mailbox,多个 Worker 用同一个 group_name 拉取,broker 保证每个任务只给一个 Worker。Worker 处理完把结果回传到主 Agent 的回调 mailbox。
阶段一:Agent 上线
1. 主 Agent 创建公共任务 mailbox(多个 Worker 共用)
[create_mailbox(name="task.queue.analysis")]
2. 主 Agent 创建结果回调 mailbox(接收所有 Worker 的执行结果)
[create_mailbox(name="task.queue.analysis.results", ttl=3600)]每个 Worker 不需要单独 inbox,直接订阅这个公共任务 mailbox。
阶段二:派发和竞争消费
主 Agent 投递任务:
1. 投递 3 个任务,payload 里带 callback 指明结果发回到哪
[send_message(
mail_address="task.queue.analysis",
payload='{"task_id":"t-001","callback":"task.queue.analysis.results",...}'
)]
[send_message(
mail_address="task.queue.analysis",
payload='{"task_id":"t-002","callback":"task.queue.analysis.results",...}'
)]
[send_message(
mail_address="task.queue.analysis",
payload='{"task_id":"t-003","callback":"task.queue.analysis.results",...}'
)]多个 Worker 竞争消费(每个 Worker 用同一个 group_name):
Worker A:
1. 拉取任务(同 group_name 下 broker 自动负载均衡)
[fetch_messages(
mail_address="task.queue.analysis",
group_name="analysis-workers",
config={"num_msgs": 1}
)]
拿到 t-001
2. 处理任务
3. 把结果回传到主 Agent 的回调 mailbox
[send_message(
mail_address="task.queue.analysis.results",
payload='{"task_id":"t-001","status":"completed","result":{...}}'
)]
4. 确认任务消息已处理
[ack_message(mail_address="task.queue.analysis", msg_id=0, group_name="analysis-workers")]Worker B(同时也在 fetch):
1. 拉取任务
[fetch_messages(
mail_address="task.queue.analysis",
group_name="analysis-workers",
config={"num_msgs": 1}
)]
拿到 t-002(broker 保证不重复给到 Worker A)
2. 处理任务
3. 把结果回传
[send_message(
mail_address="task.queue.analysis.results",
payload='{"task_id":"t-002","status":"completed","result":{...}}'
)]
4. 确认
[ack_message(...)]主 Agent 接收所有结果:
1. 拉取所有 Worker 回传的结果(无状态消费,从最早开始拿全量)
[fetch_messages(
mail_address="task.queue.analysis.results",
deliver="earliest"
)]
收到 3 条结果:
- t-001 completed
- t-002 completed
- t-003 completed同一个 group_name 的多个 Worker 自动负载均衡。Worker 增减无需任何配置变更。所有结果汇总到同一个 mailbox,主 Agent 一次拉取就能拿到全部。
用到的能力
create_mailbox send_message fetch_messages(有状态消费实现 queue group + 无状态消费拿全量结果)ack_message
场景四:Agent 发现异常,广播告警
场景描述
监控 Agent 发现问题,发到公共告警 mailbox。多个处理方各自 fetch 同一个 mailbox 但都不传 group_name(无状态消费),每个处理方独立拿到全量告警。
阶段一:Agent 上线
1. 创建公共告警 mailbox
[create_mailbox(name="alert.payment.domain")]监控 Agent 是发送方。每个处理方各自启动并轮询。
阶段二:告警发送和多方接收
监控 Agent 发 critical 告警:
1. 发送告警,用 mq9-priority 设为 critical
[send_message(
mail_address="alert.payment.domain",
payload='{"level":"critical","detail":"支付超时率突增到15%"}',
headers={"mq9-priority": "critical"}
)]告警分析处理方(无状态消费):
1. 拉告警(不传 group_name,每次都从最新开始拿)
[fetch_messages(
mail_address="alert.payment.domain",
config={"max_wait_ms": 30000}
)]
拿到 1 条告警
2. 分析处理告警通知处理方(也是无状态消费):
1. 拉告警(同样不传 group_name,独立拿到全量)
[fetch_messages(
mail_address="alert.payment.domain",
config={"max_wait_ms": 30000}
)]
拿到同一条告警
2. 发短信、邮件、IM无状态消费下每个处理方各自独立 fetch,broker 不维护位点。两个处理方对告警的处理互不影响。max_wait_ms=30000 让 broker 在没新告警时等待最多 30 秒再返回,避免客户端轮询打爆服务端。
用到的能力
create_mailbox send_message(带 mq9-priority header)fetch_messages(无状态消费 + max_wait_ms 长轮询)
场景五:云端给离线边缘 Agent 发指令
场景描述
边缘设备网络不稳定可能长时间离线。云端按 critical / urgent / normal 三级优先级发送指令到边缘的 mailbox,设备上线后 broker 按优先级顺序推送。
阶段一:Agent 上线
边缘设备首次上线时创建自己的 inbox:
1. 创建邮箱
[create_mailbox(name="edge.device.001.inbox", ttl=86400)]云端不需要 inbox,直接往边缘 mailbox 发指令即可。
阶段二:发送和接收指令
云端发送(边缘可能离线,消息持久化等待):
1. 发 critical 指令
[send_message(
mail_address="edge.device.001.inbox",
payload='{"command":"emergency_stop","reason":"温度过高"}',
headers={"mq9-priority": "critical"}
)]
2. 发 urgent 指令
[send_message(
mail_address="edge.device.001.inbox",
payload='{"command":"update_config","interval":30}',
headers={"mq9-priority": "urgent"}
)]
3. 发 normal 指令(不带 header 即默认 normal)
[send_message(
mail_address="edge.device.001.inbox",
payload='{"command":"report_status"}'
)]边缘设备上线后:
1. 拉取积压指令(broker 按 critical → urgent → normal 排序返回)
[fetch_messages(
mail_address="edge.device.001.inbox",
group_name="edge-device-001",
deliver="earliest"
)]
返回 3 条:
- [critical] emergency_stop
- [urgent] update_config
- [normal] report_status
2. 按优先级处理并逐一 ack
[处理 critical]
[ack_message(msg_id=2, ...)]
[处理 urgent]
[ack_message(msg_id=1, ...)]
[处理 normal]
[ack_message(msg_id=0, ...)]
3. 兜底:用 query 看 mailbox 是否还有遗漏
[query_mailbox(mail_address="edge.device.001.inbox")]
mailbox 已清空。broker 按 critical → urgent → normal 顺序排序返回,client 不需要自己排。
用到的能力
create_mailbox send_message(带 mq9-priority header)fetch_messages(有状态消费) ack_message query_mailbox(兜底查询)
场景六:人机混合工作流(审批场景)
场景描述
Agent 需要人类审批才能继续。Agent 把审批请求发到审批员的 mailbox,审批员用任意 MCP 客户端拉取并审批,结果回到 Agent 的 mailbox。人和 Agent 用完全相同的协议,没有特殊审批服务。
阶段一:Agent 上线
审批员的 mailbox(一次创建,长期使用):
1. 审批员张三创建邮箱
[create_mailbox(name="approver.zhang.inbox")]Agent 创建自己的 inbox:
1. Agent 创建邮箱
[create_mailbox(name="agent.coordinator.001.inbox", ttl=3600)]阶段二:发起审批和回复
Agent 发起审批请求(urgent 优先级):
1. 发审批请求到张三的 mailbox
[send_message(
mail_address="approver.zhang.inbox",
payload='{
"type":"approval_request",
"content":"申请调用外部API,预计费用$50",
"callback":"agent.coordinator.001.inbox",
"correlation_id":"approval-001"
}',
headers={"mq9-priority": "urgent"}
)]
等待审批结果。审批员一侧(用 Claude Desktop 或其他 MCP 客户端):
用户:看看我的审批 mailbox 有什么待审批的
Claude:
1. 拉取待审批请求
[fetch_messages(mail_address="approver.zhang.inbox", group_name="zhang")]
1 条审批请求:申请调用外部API,预计费用$50
用户:批准
2. 把批准结果回复到 Agent 的 mailbox
[send_message(
mail_address="agent.coordinator.001.inbox",
payload='{"approved":true,"correlation_id":"approval-001"}'
)]
3. 确认审批请求已处理
[ack_message(mail_address="approver.zhang.inbox", msg_id=0, group_name="zhang")]Agent 收到审批结果:
1. 检查 inbox 是否有审批回复
[fetch_messages(mail_address="agent.coordinator.001.inbox", group_name="coordinator-001")]
approved=true, correlation_id=approval-001
继续后续流程。人和 Agent 走同一条路——都是创建 mailbox、发消息、收消息。审批员用的 Claude Desktop 背后调的就是 mq9 MCP 工具,和 Agent 完全对等。
用到的能力
create_mailbox send_message(带 mq9-priority header)fetch_messages ack_message
场景七:Agent A 向 Agent B 提问,B 可能不在线
场景描述
异步请求-响应模式。A 发问题到 B 的 mailbox,B 上线后处理完回到 A 的 mailbox。两边都不需要保持在线,靠 correlation_id 关联请求和响应。
阶段一:Agent 上线
A 和 B 各自创建 inbox:
1. A 创建邮箱
[create_mailbox(name="agent.worker.a.inbox", ttl=600)]
2. B 创建邮箱
[create_mailbox(name="agent.worker.b.inbox", ttl=3600)]阶段二:异步问答
A 提问(B 可能不在线):
1. 发问题到 B 的 mailbox,带 callback 和 correlation_id
[send_message(
mail_address="agent.worker.b.inbox",
payload='{
"question":"当前任务进度多少?",
"callback":"agent.worker.a.inbox",
"correlation_id":"q-001"
}'
)]
问题已发送,等 B 上线回复。B 上线后处理:
1. 拉问题
[fetch_messages(mail_address="agent.worker.b.inbox", group_name="worker-b")]
1 条提问:当前任务进度多少?
2. 把回复发到 A 的 mailbox
[send_message(
mail_address="agent.worker.a.inbox",
payload='{"answer":"进度 72%","correlation_id":"q-001"}'
)]
3. 确认问题已处理
[ack_message(...)]A 接收回复(A 可以期间离线,结果不丢):
1. 拉回复
[fetch_messages(mail_address="agent.worker.a.inbox", group_name="worker-a")]
answer: 进度 72%, correlation_id: q-001双向都异步。任何一方都不需要持续在线。
用到的能力
create_mailbox send_message fetch_messages ack_message
场景八:Agent 注册能力,其他 Agent 发现并调用
场景描述
Agent 通过 register_agent 把自己的 AgentCard(包含 skills、tags、自然语言描述)注册到 mq9 注册中心。其他 Agent 用 discover_agents 按自然语言或 tag 查询找到合适的 Agent。
阶段一:被发现的 Agent 上线
1. 创建邮箱
[create_mailbox(name="agent.analyzer.001.inbox")]
2. 把 AgentCard 注册到注册中心
[register_agent(agent_card={
"name": "Data Analyzer",
"description": "Statistical analysis on tabular data",
"mailbox": "mq9://broker/agent.analyzer.001.inbox",
"skills": [
{
"id": "data-analysis",
"name": "Tabular data analysis",
"tags": ["data.analysis", "statistics", "anomaly"],
"examples": ["分析销售数据异常", "找出趋势变化"]
},
{
"id": "report-gen",
"name": "Report generation",
"tags": ["report.generation"],
"examples": ["写 1000 字分析报告"]
}
]
})]阶段二:发现和调用
自然语言查询(语义检索):
用户:找一个能做数据异常检测的 Agent
Claude:
1. 用自然语言查询注册中心
[discover_agents(query="数据异常检测和统计分析")]
找到 1 个 Agent:
- agent.analyzer.001.inbox
- Data Analyzer: tabular data analysis, anomaly detection
2. 创建回调 mailbox
[create_mailbox(name="orchestrator.cb.t001", ttl=3600)]
3. 派发任务
[send_message(
mail_address="agent.analyzer.001.inbox",
payload='{"task":"anomaly_detection","data":"...","callback":"orchestrator.cb.t001"}'
)]tag 精确查询:
用户:找所有带 report.generation 标签的 Agent
Claude:
1. 用 tag 精确查询
[discover_agents(query="tag:report.generation")]
找到 2 个 Agent:
- agent.analyzer.001.inbox(Data Analyzer)
- agent.writer.005.inbox(Technical Writer)注册中心同时支持自然语言(语义检索)和 tag(精确匹配)两种检索方式。
用到的能力
register_agent discover_agents create_mailbox send_message
八个场景的能力对照
| 场景 | 主要工具组合 | mq9 协议关键能力 |
|---|---|---|
| 1. 异步任务回传 | create_mailbox + send_message + fetch_messages(有状态/无状态混用) | mailbox 持久化 + 临时回调 mailbox + TTL |
| 2. 全局状态感知 | send_message(mq9-key + mq9-ttl)+ query_mailbox | 消息级 TTL 实现自动下线感知 |
| 3. 任务竞争消费 | fetch_messages(同 group_name 多 Worker) | 有状态消费实现 queue group 负载均衡 |
| 4. 异常告警广播 | send_message(mq9-priority)+ fetch_messages(不传 group_name) | 无状态消费实现广播订阅 |
| 5. 离线边缘指令 | send_message(mq9-priority 三级)+ query_mailbox | 三级优先级 + 持久化 + 兜底查询 |
| 6. 人机审批工作流 | send_message(mq9-priority urgent)+ fetch_messages | 人机同协议 + 双向 mailbox |
| 7. 异步请求响应 | send_message + fetch_messages | 双向 mailbox + correlation_id |
| 8. 能力注册发现 | register_agent + discover_agents | 注册中心 + 语义/tag 检索 |
八个场景共用同一组 MCP 工具,覆盖了 Agent 通信的全部基础模式——同步/异步、单播/多播、点对点/广播、人机协作、能力发现。
