Skip to content

mq9 协议的MCP Server和 八个典型场景

本文档展示 LLM 通过 MCP 工具用 mq9 完成八种 Agent 通信场景的完整流程。

每个场景分两个阶段:

  • 阶段一:相关 Agent 上线,创建 inbox、注册到注册中心、开始监听
  • 阶段二:业务交互流程

MCP 配置

整个公司部署一个 mq9 broker,所有需要用 mq9 的 AI 工具都连同一个 MCP 端点。配置就一份:

json
{
  "mcpServers": {
    "mq9": {
      "url": "http://mq9-broker.company.internal:8080/mcp"
    }
  }
}

Agent 身份通过 mailbox 名字区分,不在 MCP 配置层。每个 Agent 调用 MCP 工具时通过 mailbox 参数告诉 broker 要操作哪个邮箱。

后面所有场景默认基于这份配置。

几个协议层面的概念

阅读后续场景前,需要先理解几个 mq9 协议的核心概念:

两种消费模式

  • 有状态消费:fetch 时传 group_name,broker 记录消费位点。同 group 多消费者负载均衡(每条消息只给一个)。适合任务竞争场景。
  • 无状态消费:fetch 时不传 group_name,broker 不记录位点,每次按 deliver 策略独立消费。适合广播订阅、查看、调试场景。

消息级 TTL(mq9-ttl header):单条消息的存活时间,独立于 mailbox 的 TTL。常用于"状态上报"场景——mailbox 永不过期,但每条状态消息 N 秒后自动消失,没新消息进来就视为下线。

消息标签(mq9-tags header):逗号分隔的标签字符串(如 billing,vip),可通过 QUERY 的 tags 字段过滤。

消息 key(mq9-key header):同 key 消息存储层只保留最新一条。用于状态压实场景。

消息优先级(mq9-priority header)normal(默认)/ urgent / critical。fetch 时按优先级排序返回。

后续场景会用到这些概念。


场景一:子 Agent 完成任务,异步通知主 Agent

场景描述

主 Agent 把任务委派给执行 Agent,执行 Agent 处理完后异步把结果回传。两个 Agent 任意一方可离线、重启,消息不丢。

阶段一:Agent 上线

执行 Agent(数据分析师):

1. 创建邮箱
[create_mailbox(name="agent.analyzer.001.inbox")]

2. 注册自己的能力到注册中心
[register_agent(agent_card={
  "name": "Data Analyzer",
  "mailbox": "mq9://broker/agent.analyzer.001.inbox",
  "skills": [{
    "id": "anomaly-detection",
    "tags": ["data.analysis", "anomaly"],
    "examples": ["找出销售数据异常", "检测时序突变点"]
  }]
})]

3. 监听任务(有状态消费,broker 记录位点支持续拉)
[fetch_messages(mail_address="agent.analyzer.001.inbox", group_name="analyzer-001")]
inbox 当前为空。从这里开始我可以离线,任务进来时 broker 会持久化保存。

主 Agent(协调者):

1. 创建邮箱
[create_mailbox(name="agent.coordinator.001.inbox")]

2. 注册自己
[register_agent(agent_card={
  "name": "Task Coordinator",
  "mailbox": "mq9://broker/agent.coordinator.001.inbox",
  "skills": [{"id": "task-coordination", "tags": ["coordination"]}]
})]
注册成功。

阶段二:派发任务和接收结果

主 Agent 派发任务:

用户:销售数据需要做异常检测分析。

Claude:

1. 在注册中心找能做这件事的 Agent
[discover_agents(query="数据异常检测")]
找到 agent.analyzer.001.inbox

2. 创建回调 mailbox 接收结果
[create_mailbox(name="task.t001.callback", ttl=3600)]
(也可以复用主 Agent 自己的 inbox 接收,看业务需要)

3. 派发任务给执行 Agent,告诉他结果发到回调 mailbox
[send_message(
  mail_address="agent.analyzer.001.inbox",
  payload='{"task_id":"t-001","type":"anomaly_detection","data":"...","callback":"task.t001.callback"}'
)]
任务已派发。等结果回来,期间我可以离线。

执行 Agent 处理任务:

1. 拉取新任务
[fetch_messages(mail_address="agent.analyzer.001.inbox", group_name="analyzer-001")]
收到 1 个任务:t-001 anomaly_detection

2. 处理任务
[执行分析逻辑...]

3. 把结果发到回调 mailbox
[send_message(
  mail_address="task.t001.callback",
  payload='{"task_id":"t-001","status":"completed","result":{...}}'
)]

4. 确认 inbox 消息处理完毕
[ack_message(mail_address="agent.analyzer.001.inbox", msg_id=0, group_name="analyzer-001")]

主 Agent 接收结果:

1. 检查回调 mailbox(无状态消费,不需要 group_name,每次拉全量)
[fetch_messages(mail_address="task.t001.callback")]
收到结果:t-001 completed,发现 3 个异常点

回调 mailbox 通常是一次性的,主 Agent 拿到结果后这个 mailbox 就没用了,等 TTL 自动清理。无状态消费在这种场景下省掉 group_name 和 ack 的负担。

用到的能力

create_mailbox register_agent discover_agents send_message fetch_messages(有状态/无状态两种用法)ack_message


场景二:主 Agent 感知所有子 Agent 状态

场景描述

主 Agent 关注若干 Worker 的运行状态。每个 Worker 把自己的状态发到一个固定 mailbox,用消息级 TTL 让状态自动过期。主 Agent 用 query_mailbox 一眼看完,没拿到就说明 Worker 下线了。

阶段一:Agent 上线

每个 Worker 创建自己的状态 mailbox,mailbox 本身永不过期:

1. Worker-001 创建状态 mailbox(永不过期)
[create_mailbox(name="status.worker-001")]

2. Worker-002 创建状态 mailbox(永不过期)
[create_mailbox(name="status.worker-002")]

主 Agent 不需要订阅,直接 query 即可。

阶段二:状态上报和查询

Worker 一侧(每 30 秒上报一次):

1. 上报当前状态,用 mq9-key 让 mailbox 只保留最新一条
   用 mq9-ttl 让这条状态消息 60 秒后自动过期
[send_message(
  mail_address="status.worker-001",
  payload='{"status":"running","load":0.3,"capabilities":["data.analysis"]}',
  headers={"mq9-key": "status", "mq9-ttl": "60"}
)]

mq9-key="status" 让 mailbox 里只保留最新一条,下次上报旧状态被覆盖。 mq9-ttl=60 让这条消息 60 秒后从 mailbox 消失。

Worker 持续上报(每 30 秒一次)相当于持续刷新最新状态——上一条消息没过期就被新消息覆盖。如果 Worker 挂了停止上报,最后一条状态消息 60 秒后自动消失,mailbox 变空。

主 Agent 一侧(按需查询):

用户:看一眼 worker-001 和 worker-002 当前状态

Claude:

1. 查 worker-001
[query_mailbox(mail_address="status.worker-001", key="status")]
worker-001: running, load 0.3

2. 查 worker-002
[query_mailbox(mail_address="status.worker-002", key="status")]
worker-002: 没有消息(推测已下线,最后一次状态上报 60 秒前过期)

query_mailbox 不消费消息也不推进位点,纯查看。

用到的能力

create_mailbox send_message(带 mq9-key + mq9-ttl 两个 header)query_mailbox


场景三:任务广播,多个 Worker 竞争消费

场景描述

主 Agent 投递一批任务到一个公共 mailbox,多个 Worker 用同一个 group_name 拉取,broker 保证每个任务只给一个 Worker。Worker 处理完把结果回传到主 Agent 的回调 mailbox。

阶段一:Agent 上线

1. 主 Agent 创建公共任务 mailbox(多个 Worker 共用)
[create_mailbox(name="task.queue.analysis")]

2. 主 Agent 创建结果回调 mailbox(接收所有 Worker 的执行结果)
[create_mailbox(name="task.queue.analysis.results", ttl=3600)]

每个 Worker 不需要单独 inbox,直接订阅这个公共任务 mailbox。

阶段二:派发和竞争消费

主 Agent 投递任务:

1. 投递 3 个任务,payload 里带 callback 指明结果发回到哪
[send_message(
  mail_address="task.queue.analysis",
  payload='{"task_id":"t-001","callback":"task.queue.analysis.results",...}'
)]
[send_message(
  mail_address="task.queue.analysis",
  payload='{"task_id":"t-002","callback":"task.queue.analysis.results",...}'
)]
[send_message(
  mail_address="task.queue.analysis",
  payload='{"task_id":"t-003","callback":"task.queue.analysis.results",...}'
)]

多个 Worker 竞争消费(每个 Worker 用同一个 group_name):

Worker A:

1. 拉取任务(同 group_name 下 broker 自动负载均衡)
[fetch_messages(
  mail_address="task.queue.analysis",
  group_name="analysis-workers",
  config={"num_msgs": 1}
)]
拿到 t-001

2. 处理任务

3. 把结果回传到主 Agent 的回调 mailbox
[send_message(
  mail_address="task.queue.analysis.results",
  payload='{"task_id":"t-001","status":"completed","result":{...}}'
)]

4. 确认任务消息已处理
[ack_message(mail_address="task.queue.analysis", msg_id=0, group_name="analysis-workers")]
Worker B(同时也在 fetch):

1. 拉取任务
[fetch_messages(
  mail_address="task.queue.analysis",
  group_name="analysis-workers",
  config={"num_msgs": 1}
)]
拿到 t-002(broker 保证不重复给到 Worker A)

2. 处理任务

3. 把结果回传
[send_message(
  mail_address="task.queue.analysis.results",
  payload='{"task_id":"t-002","status":"completed","result":{...}}'
)]

4. 确认
[ack_message(...)]

主 Agent 接收所有结果:

1. 拉取所有 Worker 回传的结果(无状态消费,从最早开始拿全量)
[fetch_messages(
  mail_address="task.queue.analysis.results",
  deliver="earliest"
)]
收到 3 条结果:
- t-001 completed
- t-002 completed
- t-003 completed

同一个 group_name 的多个 Worker 自动负载均衡。Worker 增减无需任何配置变更。所有结果汇总到同一个 mailbox,主 Agent 一次拉取就能拿到全部。

用到的能力

create_mailbox send_message fetch_messages(有状态消费实现 queue group + 无状态消费拿全量结果)ack_message


场景四:Agent 发现异常,广播告警

场景描述

监控 Agent 发现问题,发到公共告警 mailbox。多个处理方各自 fetch 同一个 mailbox 但都不传 group_name(无状态消费),每个处理方独立拿到全量告警。

阶段一:Agent 上线

1. 创建公共告警 mailbox
[create_mailbox(name="alert.payment.domain")]

监控 Agent 是发送方。每个处理方各自启动并轮询。

阶段二:告警发送和多方接收

监控 Agent 发 critical 告警:

1. 发送告警,用 mq9-priority 设为 critical
[send_message(
  mail_address="alert.payment.domain",
  payload='{"level":"critical","detail":"支付超时率突增到15%"}',
  headers={"mq9-priority": "critical"}
)]

告警分析处理方(无状态消费):

1. 拉告警(不传 group_name,每次都从最新开始拿)
[fetch_messages(
  mail_address="alert.payment.domain",
  config={"max_wait_ms": 30000}
)]
拿到 1 条告警

2. 分析处理

告警通知处理方(也是无状态消费):

1. 拉告警(同样不传 group_name,独立拿到全量)
[fetch_messages(
  mail_address="alert.payment.domain",
  config={"max_wait_ms": 30000}
)]
拿到同一条告警

2. 发短信、邮件、IM

无状态消费下每个处理方各自独立 fetch,broker 不维护位点。两个处理方对告警的处理互不影响。max_wait_ms=30000 让 broker 在没新告警时等待最多 30 秒再返回,避免客户端轮询打爆服务端。

用到的能力

create_mailbox send_message(带 mq9-priority header)fetch_messages(无状态消费 + max_wait_ms 长轮询)


场景五:云端给离线边缘 Agent 发指令

场景描述

边缘设备网络不稳定可能长时间离线。云端按 critical / urgent / normal 三级优先级发送指令到边缘的 mailbox,设备上线后 broker 按优先级顺序推送。

阶段一:Agent 上线

边缘设备首次上线时创建自己的 inbox:

1. 创建邮箱
[create_mailbox(name="edge.device.001.inbox", ttl=86400)]

云端不需要 inbox,直接往边缘 mailbox 发指令即可。

阶段二:发送和接收指令

云端发送(边缘可能离线,消息持久化等待):

1. 发 critical 指令
[send_message(
  mail_address="edge.device.001.inbox",
  payload='{"command":"emergency_stop","reason":"温度过高"}',
  headers={"mq9-priority": "critical"}
)]

2. 发 urgent 指令
[send_message(
  mail_address="edge.device.001.inbox",
  payload='{"command":"update_config","interval":30}',
  headers={"mq9-priority": "urgent"}
)]

3. 发 normal 指令(不带 header 即默认 normal)
[send_message(
  mail_address="edge.device.001.inbox",
  payload='{"command":"report_status"}'
)]

边缘设备上线后:

1. 拉取积压指令(broker 按 critical → urgent → normal 排序返回)
[fetch_messages(
  mail_address="edge.device.001.inbox",
  group_name="edge-device-001",
  deliver="earliest"
)]
返回 3 条:
- [critical] emergency_stop
- [urgent] update_config
- [normal] report_status

2. 按优先级处理并逐一 ack
[处理 critical]
[ack_message(msg_id=2, ...)]
[处理 urgent]
[ack_message(msg_id=1, ...)]
[处理 normal]
[ack_message(msg_id=0, ...)]

3. 兜底:用 query 看 mailbox 是否还有遗漏
[query_mailbox(mail_address="edge.device.001.inbox")]
mailbox 已清空。

broker 按 critical → urgent → normal 顺序排序返回,client 不需要自己排。

用到的能力

create_mailbox send_message(带 mq9-priority header)fetch_messages(有状态消费) ack_message query_mailbox(兜底查询)


场景六:人机混合工作流(审批场景)

场景描述

Agent 需要人类审批才能继续。Agent 把审批请求发到审批员的 mailbox,审批员用任意 MCP 客户端拉取并审批,结果回到 Agent 的 mailbox。人和 Agent 用完全相同的协议,没有特殊审批服务。

阶段一:Agent 上线

审批员的 mailbox(一次创建,长期使用):

1. 审批员张三创建邮箱
[create_mailbox(name="approver.zhang.inbox")]

Agent 创建自己的 inbox:

1. Agent 创建邮箱
[create_mailbox(name="agent.coordinator.001.inbox", ttl=3600)]

阶段二:发起审批和回复

Agent 发起审批请求(urgent 优先级):

1. 发审批请求到张三的 mailbox
[send_message(
  mail_address="approver.zhang.inbox",
  payload='{
    "type":"approval_request",
    "content":"申请调用外部API,预计费用$50",
    "callback":"agent.coordinator.001.inbox",
    "correlation_id":"approval-001"
  }',
  headers={"mq9-priority": "urgent"}
)]
等待审批结果。

审批员一侧(用 Claude Desktop 或其他 MCP 客户端):

用户:看看我的审批 mailbox 有什么待审批的

Claude:

1. 拉取待审批请求
[fetch_messages(mail_address="approver.zhang.inbox", group_name="zhang")]
1 条审批请求:申请调用外部API,预计费用$50

用户:批准

2. 把批准结果回复到 Agent 的 mailbox
[send_message(
  mail_address="agent.coordinator.001.inbox",
  payload='{"approved":true,"correlation_id":"approval-001"}'
)]

3. 确认审批请求已处理
[ack_message(mail_address="approver.zhang.inbox", msg_id=0, group_name="zhang")]

Agent 收到审批结果:

1. 检查 inbox 是否有审批回复
[fetch_messages(mail_address="agent.coordinator.001.inbox", group_name="coordinator-001")]
approved=true, correlation_id=approval-001
继续后续流程。

人和 Agent 走同一条路——都是创建 mailbox、发消息、收消息。审批员用的 Claude Desktop 背后调的就是 mq9 MCP 工具,和 Agent 完全对等。

用到的能力

create_mailbox send_message(带 mq9-priority header)fetch_messages ack_message


场景七:Agent A 向 Agent B 提问,B 可能不在线

场景描述

异步请求-响应模式。A 发问题到 B 的 mailbox,B 上线后处理完回到 A 的 mailbox。两边都不需要保持在线,靠 correlation_id 关联请求和响应。

阶段一:Agent 上线

A 和 B 各自创建 inbox:

1. A 创建邮箱
[create_mailbox(name="agent.worker.a.inbox", ttl=600)]

2. B 创建邮箱
[create_mailbox(name="agent.worker.b.inbox", ttl=3600)]

阶段二:异步问答

A 提问(B 可能不在线):

1. 发问题到 B 的 mailbox,带 callback 和 correlation_id
[send_message(
  mail_address="agent.worker.b.inbox",
  payload='{
    "question":"当前任务进度多少?",
    "callback":"agent.worker.a.inbox",
    "correlation_id":"q-001"
  }'
)]
问题已发送,等 B 上线回复。

B 上线后处理:

1. 拉问题
[fetch_messages(mail_address="agent.worker.b.inbox", group_name="worker-b")]
1 条提问:当前任务进度多少?

2. 把回复发到 A 的 mailbox
[send_message(
  mail_address="agent.worker.a.inbox",
  payload='{"answer":"进度 72%","correlation_id":"q-001"}'
)]

3. 确认问题已处理
[ack_message(...)]

A 接收回复(A 可以期间离线,结果不丢):

1. 拉回复
[fetch_messages(mail_address="agent.worker.a.inbox", group_name="worker-a")]
answer: 进度 72%, correlation_id: q-001

双向都异步。任何一方都不需要持续在线。

用到的能力

create_mailbox send_message fetch_messages ack_message


场景八:Agent 注册能力,其他 Agent 发现并调用

场景描述

Agent 通过 register_agent 把自己的 AgentCard(包含 skills、tags、自然语言描述)注册到 mq9 注册中心。其他 Agent 用 discover_agents 按自然语言或 tag 查询找到合适的 Agent。

阶段一:被发现的 Agent 上线

1. 创建邮箱
[create_mailbox(name="agent.analyzer.001.inbox")]

2. 把 AgentCard 注册到注册中心
[register_agent(agent_card={
  "name": "Data Analyzer",
  "description": "Statistical analysis on tabular data",
  "mailbox": "mq9://broker/agent.analyzer.001.inbox",
  "skills": [
    {
      "id": "data-analysis",
      "name": "Tabular data analysis",
      "tags": ["data.analysis", "statistics", "anomaly"],
      "examples": ["分析销售数据异常", "找出趋势变化"]
    },
    {
      "id": "report-gen",
      "name": "Report generation",
      "tags": ["report.generation"],
      "examples": ["写 1000 字分析报告"]
    }
  ]
})]

阶段二:发现和调用

自然语言查询(语义检索):

用户:找一个能做数据异常检测的 Agent

Claude:

1. 用自然语言查询注册中心
[discover_agents(query="数据异常检测和统计分析")]
找到 1 个 Agent:
- agent.analyzer.001.inbox
- Data Analyzer: tabular data analysis, anomaly detection

2. 创建回调 mailbox
[create_mailbox(name="orchestrator.cb.t001", ttl=3600)]

3. 派发任务
[send_message(
  mail_address="agent.analyzer.001.inbox",
  payload='{"task":"anomaly_detection","data":"...","callback":"orchestrator.cb.t001"}'
)]

tag 精确查询:

用户:找所有带 report.generation 标签的 Agent

Claude:

1. 用 tag 精确查询
[discover_agents(query="tag:report.generation")]
找到 2 个 Agent:
- agent.analyzer.001.inbox(Data Analyzer)
- agent.writer.005.inbox(Technical Writer)

注册中心同时支持自然语言(语义检索)和 tag(精确匹配)两种检索方式。

用到的能力

register_agent discover_agents create_mailbox send_message


八个场景的能力对照

场景主要工具组合mq9 协议关键能力
1. 异步任务回传create_mailbox + send_message + fetch_messages(有状态/无状态混用)mailbox 持久化 + 临时回调 mailbox + TTL
2. 全局状态感知send_message(mq9-key + mq9-ttl)+ query_mailbox消息级 TTL 实现自动下线感知
3. 任务竞争消费fetch_messages(同 group_name 多 Worker)有状态消费实现 queue group 负载均衡
4. 异常告警广播send_message(mq9-priority)+ fetch_messages(不传 group_name)无状态消费实现广播订阅
5. 离线边缘指令send_message(mq9-priority 三级)+ query_mailbox三级优先级 + 持久化 + 兜底查询
6. 人机审批工作流send_message(mq9-priority urgent)+ fetch_messages人机同协议 + 双向 mailbox
7. 异步请求响应send_message + fetch_messages双向 mailbox + correlation_id
8. 能力注册发现register_agent + discover_agents注册中心 + 语义/tag 检索

八个场景共用同一组 MCP 工具,覆盖了 Agent 通信的全部基础模式——同步/异步、单播/多播、点对点/广播、人机协作、能力发现。

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀