mq9 vs 现有方案:八个场景逐一对比
Agent 之间的通信,今天没有标准答案。大家都在用手边的工具绕——HTTP、Redis、Kafka、自研队列。能用,但每个场景都要加东西。
这篇文章把 mq9 要解决的八个核心场景拿出来,逐一对比现有方案怎么做、痛点在哪、mq9 怎么做、优势有多大。不吹不黑,摆事实。
协议基础:mq9 的命令字只有四个:
MAILBOX.CREATE、MAILBOX.MSG.{mail_address}、MAILBOX.LIST.{mail_address}、MAILBOX.DELETE.{mail_address}.{msg_id}。订阅用MAILBOX.MSG.{mail_address}加 queue group。所有操作都需要知道精确的 mail_address,禁止通配符跨邮箱订阅。
场景一:子 Agent 完成任务,异步通知主 Agent
HTTP:子 Agent 回调主 Agent 的 webhook。主 Agent 必须在线且暴露端点。重启、网络抖动,回调丢了,需要自己做重试、幂等、超时处理。每个项目写一遍。
Redis:子 Agent publish 到 channel,主 Agent subscribe。主 Agent 不在线那一刻消息直接丢了。要做可靠投递,得自己加 Redis Stream 或 List,本质上在 Redis 上手搓消息队列。
Kafka:创建 topic,子 Agent produce,主 Agent consume。能用,但需要预先创建 topic、配置 partition、配置 retention、管理 consumer group 和 offset。一个简单的任务回传,引入了一整套 Kafka 运维。
mq9:主 Agent 用 MAILBOX.CREATE 拿到 mail_address,子 Agent 用 MAILBOX.MSG.{mail_address} 发送结果。在线实时收到,不在线消息持久化存储等上线后推送。用 MAILBOX.LIST 可主动拉取确认不遗漏。
# 主 Agent 创建邮箱
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":3600}'
# → {"mail_address":"abc123"}
# 子 Agent 发结果(normal 优先级)
nats request '$mq9.AI.MAILBOX.MSG.abc123' '{"task_id":"t-001","result":"分析完成"}'
# → {"msg_id":0}
# 主 Agent 订阅接收(含历史未读消息)
nats subscribe '$mq9.AI.MAILBOX.MSG.abc123'
# 兜底:主动拉取查看邮箱内容
nats request '$mq9.AI.MAILBOX.LIST.abc123' ''优势:中。相比 HTTP 不需要对方在线,相比 Redis 天然持久化,相比 Kafka 零运维配置。
场景二:主 Agent 感知所有子 Agent 状态
HTTP 轮询:主 Agent 定时请求每个子 Agent 的 /health 端点。需要维护 Agent 列表,新增要注册,消亡要清理。n 个 Agent 就是 n 个请求,O(n) 开销。间隔太长感知延迟大,太短浪费资源。
Redis:每个 Agent 写一个 key 带 TTL,主 Agent 用 SCAN 遍历。O(n) 操作,Agent 多了性能差。Redis 没有"订阅 key 变化"的可靠原生能力,只能轮询。
Kafka:每个 Agent 往 compacted topic 写状态。能实现,但清理不是实时的,有延迟。每次写入都是一条持久化日志,只为"最新状态"付出了全量持久化的代价。
mq9:每个 Worker 启动时用约定好的公开邮箱名(如 status.worker-001)CREATE 邮箱,定期用 MAILBOX.MSG 上报状态,TTL 决定邮箱寿命。主 Agent 订阅每个 Worker 的邮箱感知状态;发现 Worker 不再更新时,TTL 到期邮箱自动消亡。Worker 下线无需主动注销。
# Worker-001 启动,注册公开状态邮箱
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":60,"public":true,"name":"status.worker-001","desc":"worker状态"}'
# Worker-001 定期上报状态(每 30 秒)
nats request '$mq9.AI.MAILBOX.MSG.status.worker-001' '{"status":"running","load":0.3}'
# 主 Agent 订阅已知 Worker 的状态邮箱
nats subscribe '$mq9.AI.MAILBOX.MSG.status.worker-001'
nats subscribe '$mq9.AI.MAILBOX.MSG.status.worker-002'优势:高。零轮询,TTL 自动感知消亡,新 Worker 用约定名字注册后主 Agent 按需订阅。
场景三:任务广播,多个 Worker 竞争消费
HTTP:主 Agent 自己实现负载均衡,维护 Worker 列表,选一个发请求。Worker 挂了要重试换一个。自己写调度器,自己处理故障转移。
Redis:主 Agent LPUSH 到 List,Worker 用 BRPOP 竞争获取。最常见的方案,但没有消息确认——Worker BRPOP 拿走后崩溃,任务丢了,需要自己做可靠队列。没有优先级,没有广播能力。
Kafka:发到 topic,多个 consumer 在同一个 group 里竞争。能用,但 partition 数决定最大并行度,动态伸缩 Worker 需要 rebalance,rebalance 期间有消费停顿。
mq9:创建一个公开任务邮箱(如 task.queue),主 Agent 用 MAILBOX.MSG 投递任务,多个 Worker 用 queue group 订阅竞争消费,每条消息只有一个 Worker 抢到。Worker 动态加减无需任何配置变更。
# 主 Agent 创建任务公开邮箱(一次性)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"task.queue","desc":"任务队列"}'
# 主 Agent 投递任务
nats request '$mq9.AI.MAILBOX.MSG.task.queue' '{"task_id":"t-001","type":"data_analysis"}'
nats request '$mq9.AI.MAILBOX.MSG.task.queue' '{"task_id":"t-002","type":"data_analysis"}'
# Worker 们用 queue group 竞争消费
nats subscribe '$mq9.AI.MAILBOX.MSG.task.queue' --queue task.workers
# Worker-1、Worker-2、Worker-3 各自运行上面这条命令,每条消息只被一个抢到优势:中。相比 HTTP 无需维护 Worker 列表,相比 Redis BRPOP 无丢失风险,相比 Kafka 无 rebalance。但 mq9 处于早期阶段,生产稳定性需验证。
场景四:Agent 发现异常,广播告警
HTTP:发现异常的 Agent 需要知道谁有处理能力,维护订阅者列表,逐一 POST。列表维护本身就是分布式问题。
Redis:PUBLISH 到 channel,所有订阅者收到。能用,但不在线的收不到,没有持久化。告警发出那一刻处理方刚好重启,告警就丢了。
Kafka:发到 topic,每个处理方用独立 consumer group 消费。能实现一发多收,但每个处理方都要配 consumer group,topic 要预先创建。
mq9:创建一个公开告警邮箱(如 alert.payment),监控 Agent 发告警到此邮箱,所有关注的 Agent 各自订阅这个邮箱接收。消息持久化,重要告警上线后补收不遗漏。发布方不需要知道谁在听。
# 创建公开告警邮箱(一次性)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"alert.payment","desc":"支付域告警"}'
# 监控 Agent 发现异常,发 critical 告警
nats request '$mq9.AI.MAILBOX.MSG.alert.payment.critical' '{"level":"critical","detail":"支付超时率突增到15%"}'
# 告警处理 Agent A 订阅
nats subscribe '$mq9.AI.MAILBOX.MSG.alert.payment'
# 告警处理 Agent B 也订阅(同一邮箱,各自独立收到)
nats subscribe '$mq9.AI.MAILBOX.MSG.alert.payment'优势:中。相比 HTTP 无需维护订阅者列表,相比 Redis 持久化不丢失,相比 Kafka 无需预先配置 consumer group。但每个处理方需要知道告警邮箱地址(公开邮箱名字固定,提前约定即可)。
场景五:云端给离线边缘 Agent 发指令
HTTP:直接失败。边缘不在线,请求超时。需要自建重试队列、指数退避、持久化未发送指令。本质上在云端手搓消息队列。
Redis:写到 List 等边缘上线消费。但没有优先级,紧急指令和普通指令混在一起排队。要做优先级得用 Sorted Set,复杂度上去了。Redis 是内存数据库,积压量大成本高。
Kafka:写到 topic,边缘上线后消费。能用,但没有原生优先级——partition 内严格 FIFO,紧急指令要走单独 topic,消费端要同时订阅多个 topic 自己做优先级调度。
mq9:每个边缘 Agent 有自己的 mail_address(上线时创建邮箱并告知云端)。云端用 MAILBOX.MSG.{mail_address}.critical 发紧急指令,.urgent 发重要指令,不加后缀发普通指令。边缘上线后订阅自己邮箱,服务端按 critical → urgent → normal 顺序推送。用 MAILBOX.LIST 兜底确认不遗漏。
# 云端:发紧急指令(边缘可能不在线,消息持久化等待)
nats request '$mq9.AI.MAILBOX.MSG.edge-device.critical' '{"command":"emergency_stop","reason":"温度过高"}'
nats request '$mq9.AI.MAILBOX.MSG.edge-device.urgent' '{"command":"update_config","interval":30}'
nats request '$mq9.AI.MAILBOX.MSG.edge-device' '{"command":"report_status"}'
# 边缘 Agent 上线后订阅自己邮箱(按优先级推送)
nats subscribe '$mq9.AI.MAILBOX.MSG.edge-device'
# 兜底:主动拉取检查是否有遗漏
nats request '$mq9.AI.MAILBOX.LIST.edge-device' ''优势:高。相比 HTTP 无需对方在线,相比 Redis 有三级优先级且持久化成本低(RocksDB),相比 Kafka 优先级原生支持无需多 topic。
场景六:人机混合工作流
HTTP + 前端:Agent 调审批服务 API 创建审批单,前端轮询或 WebSocket 推送给审批员,审批员操作后回调 Agent。至少三个系统对接:Agent → 审批服务 → 前端 → 审批服务 → Agent。
Redis:Agent 写审批请求到 key,审批员客户端轮询,审批后写结果到另一个 key,Agent 轮询结果。两端都在轮询,延迟高,逻辑分散。
Kafka:审批请求发一个 topic,审批结果发另一个 topic。能用,但审批员需要 Kafka 消费客户端,对非技术人员不友好。
mq9:审批员有自己的 mail_address(可用约定名,如 approver.zhang)。Agent 用 MAILBOX.MSG.{审批员mail_address}.urgent 发审批请求,审批员的客户端订阅自己邮箱,审批后用 MAILBOX.MSG.{Agent的mail_address} 回复结果。人类和 Agent 用完全相同的协议,无需额外审批服务。payload 带 correlation_id 保证配对。
# 审批员邮箱(预先创建公开邮箱,名字即地址)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"approver.zhang","desc":"张三审批邮箱"}'
# Agent 发审批请求(urgent 优先级)
nats request '$mq9.AI.MAILBOX.MSG.approver.zhang.urgent' '{
"from":"agent",
"type":"approval_request",
"correlation_id":"approval-001",
"content":"申请调用外部API,预计费用$50"
}'
# 审批员客户端订阅自己邮箱
nats subscribe '$mq9.AI.MAILBOX.MSG.approver.zhang'
# 审批员审批后,回复到 Agent 邮箱
nats request '$mq9.AI.MAILBOX.MSG.agent' '{
"from":"approver.zhang",
"approved":true,
"correlation_id":"approval-001"
}'
# Agent 订阅自己邮箱等待审批结果
nats subscribe '$mq9.AI.MAILBOX.MSG.agent'优势:高。人机同一套协议,无需额外审批服务,消息持久化,审批员离线不丢。
场景七:Agent A 向 Agent B 提问,B 可能不在线
HTTP:B 不在线直接失败。要做异步,得引入回调 URL、correlation_id、超时重试、状态机——复杂度爆炸。
Redis:A 写请求到 B 的 List,B 处理后写结果到 A 的 List。两个 List,两端轮询,自己管 correlation_id 配对。能用,代码量不小。
Kafka:A 发请求到 request topic 带 reply_to header,B 消费处理后发到 reply topic。两个 topic、两个 consumer group,A 还要从 reply topic 按 correlation_id 过滤属于自己的响应。
mq9:A 创建自己的私有邮箱,用 MAILBOX.MSG.{B的mail_address} 发问题,payload 带 reply_to(A 的 mail_address)和 correlation_id。B 上线后订阅自己邮箱,处理后用 MAILBOX.MSG.{reply_to} 回复。B 不在线时请求持久化在 B 的邮箱里,等 B 上线推送。
# A 创建自己的邮箱用于接收回复
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":300}'
# → {"mail_address":"agent-a"}
# A 向 B 提问,带 reply_to
nats request '$mq9.AI.MAILBOX.MSG.agent-b' '{
"from":"agent-a",
"question":"当前进度多少?",
"reply_to":"agent-a",
"correlation_id":"req-001"
}'
# B 订阅自己邮箱(B 上线时执行)
nats subscribe '$mq9.AI.MAILBOX.MSG.agent-b'
# B 处理后回复到 reply_to
nats request '$mq9.AI.MAILBOX.MSG.agent-a' '{
"from":"agent-b",
"answer":"当前进度72%",
"correlation_id":"req-001"
}'
# A 订阅自己邮箱等回复
nats subscribe '$mq9.AI.MAILBOX.MSG.agent-a'优势:中。B 不在线不丢消息,A 也不需要在线等待,双方完全异步。相比 Kafka 无需两个 topic 和 correlation_id 过滤逻辑。
场景八:Agent 注册能力,其他 Agent 发现它
HTTP + 注册中心:需要中心化的服务注册中心(Consul、Eureka、自建)。Agent 启动注册,消亡注销或靠健康检查剔除。引入额外有状态服务,本身需要高可用部署。
Redis:Agent 把能力写到 Hash 或 Set 带 TTL,查询方 SCAN 或 SMEMBERS。能用,但没有订阅能力变化的机制,只能轮询。Agent 多了 SCAN 性能差。
Kafka:能力声明写到 compacted topic,查询方消费整个 topic 构建本地视图。延迟高,实现复杂,为简单的服务发现引入了完整的 Kafka 消费链路。
mq9:每个 Agent 启动时用约定名字创建公开邮箱(如 capability.worker-001),并发送一条能力声明消息。TTL 决定注册有效期,不续期自动到期。需要发现能力的 Agent 创建一个公开查询邮箱(如 capability.query),广播查询需求到此邮箱,有能力的 Agent 订阅并回复到发现方邮箱。
# Worker-001 注册能力(TTL=60,需定期续期)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"capability.worker-001","ttl":60}'
nats request '$mq9.AI.MAILBOX.MSG.capability.worker-001' '{
"capabilities":["data.analysis","report.generation"],
"load":0.3,
"reply_to":"capability.worker-001"
}'
# 编排器创建查询邮箱(一次性)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"capability.query"}'
# Worker 订阅查询邮箱,有需求时回复
nats subscribe '$mq9.AI.MAILBOX.MSG.capability.query'
# Worker-001 订阅后,收到查询时回复到编排器邮箱
# 编排器广播查询:谁有 data.analysis 能力?
nats request '$mq9.AI.MAILBOX.MSG.capability.query' '{
"from":"orchestrator",
"need":"data.analysis",
"reply_to":"orchestrator"
}'
# 编排器订阅自己邮箱等待响应
nats subscribe '$mq9.AI.MAILBOX.MSG.orchestrator'优势:高。零额外注册中心,TTL 自动感知 Agent 消亡,去中心化。
总结
| 场景 | HTTP | Redis | Kafka | mq9 | mq9 优势 |
|---|---|---|---|---|---|
| 子 Agent 完成任务,异步通知主 Agent | 需对方在线,自建重试 | 不持久化,需手搓可靠队列 | 能用,运维成本高 | CREATE + MSG + LIST 兜底 | 中 |
| 主 Agent 感知所有子 Agent 状态 | 轮询 O(n) | SCAN 轮询 | compacted topic,延迟高 | 公开邮箱 + TTL 自动过期 | 高 |
| 任务广播,多个 Worker 竞争消费 | 自建调度 | BRPOP 不可靠 | rebalance 有停顿 | 公开邮箱 + queue group,零配置 | 中 |
| Agent 发现异常,广播告警 | 维护订阅列表 | 不持久化 | 需多 consumer group | 公开告警邮箱 + 多方订阅 | 中 |
| 云端给离线边缘 Agent 发指令 | 直接失败 | 无优先级,内存成本高 | 无原生优先级 | MSG 三级优先级 + 持久化 | 高 |
| Agent 发审批请求,人类审批后继续流程 | 至少三系统对接 | 两端轮询 | 审批员需 Kafka 客户端 | 人机同一协议,双向 MSG | 高 |
| Agent A 向 Agent B 提问,B 可能不在线 | 不在线直接失败 | 两 List + 两端轮询 | 两 topic + 过滤响应 | MSG + reply_to,离线不丢 | 中 |
| Agent 注册能力,其他 Agent 发现并调用 | 需注册中心 | SCAN 轮询 | compacted topic,实现复杂 | 公开邮箱 + TTL 自动清理 | 高 |
八个场景,4 个高优势、4 个中优势。mq9 的核心价值不是某个维度的碾压,而是每个场景都不需要绕路——四个命令字直接覆盖,零额外逻辑。
需要说明的是:mq9 目前处于早期阶段,生态成熟度远不及以上任何一个方案。如果你的场景是大数据管道和高吞吐流处理,Kafka 仍然是最优解。以上对比针对的是 Agent 异步通信这个特定场景。
邮箱行为映射
现实邮箱的完整行为,对应 mq9 四个命令字:
| 现实邮箱 | mq9 命令 | 说明 |
|---|---|---|
| 开通邮箱 | MAILBOX.CREATE | 拿到 mail_address,可选 TTL、public、prefix |
| 寄信 | `MAILBOX.MSG.{mail_address}[.urgent | .critical]` |
| 收信 | SUB $mq9.AI.MAILBOX.MSG.{mail_address} | 在线实时推送,按优先级 critical→urgent→normal |
| 去邮局查信 | MAILBOX.LIST.{mail_address} | 主动拉取,看邮箱里还有哪些消息 |
| 销毁某封信 | MAILBOX.DELETE.{mail_address}.{msg_id} | 处理完的消息显式清理 |
| 邮箱过期 | TTL 自动销毁 | 长期没人用,邮局自动注销 |
| 公告栏 | 公开邮箱 + 多方 SUB | 创建 public 邮箱,所有知道名字的都能看 |
| 抢活 | queue group | 订阅时加 --queue,一条消息只有一个人抢到 |
Python 代码示例
八个场景,八个函数,全部基于 NATS 原生 Python SDK(nats-py),零自定义封装。
import asyncio
import json
import nats
SERVER = "nats://localhost:4222"
# ============================================================
# 场景一:子 Agent 完成任务,异步通知主 Agent
# ============================================================
async def scenario1_async_task_return():
nc = await nats.connect(SERVER)
# 主 Agent:创建私有邮箱
reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":3600}', timeout=3)
data = json.loads(reply.data)
main_inbox = data["mail_address"]
print(f"[主Agent] 邮箱: {main_inbox}")
received = asyncio.Event()
async def on_message(msg):
print(f"[主Agent] 收到结果: {msg.data.decode()}")
received.set()
# 主 Agent 订阅自己邮箱
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{main_inbox}", cb=on_message)
# 子 Agent:任务完成,发结果
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{main_inbox}",
json.dumps({"task_id": "t-001", "result": "分析完成,异常率2.3%"}).encode(),
timeout=3
)
await asyncio.wait_for(received.wait(), timeout=5)
await nc.close()
# ============================================================
# 场景二:主 Agent 感知所有子 Agent 状态
# ============================================================
async def scenario2_global_status_awareness():
nc = await nats.connect(SERVER)
# 各 Worker 创建公开状态邮箱(TTL=60,定期续期)
for i in range(1, 3):
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": f"status.worker-{i:03d}", "ttl": 60}).encode(),
timeout=3
)
async def on_status(msg):
print(f"[主Agent] 感知状态: {msg.subject} -> {msg.data.decode()}")
# 主 Agent 订阅已知 Worker 的状态邮箱
await nc.subscribe("$mq9.AI.MAILBOX.MSG.status.worker-001", cb=on_status)
await nc.subscribe("$mq9.AI.MAILBOX.MSG.status.worker-002", cb=on_status)
# Worker-001 上报状态
await nc.request(
"$mq9.AI.MAILBOX.MSG.status.worker-001",
json.dumps({"status": "running", "load": 0.3, "capabilities": ["data.analysis"]}).encode(),
timeout=3
)
# Worker-002 上报状态
await nc.request(
"$mq9.AI.MAILBOX.MSG.status.worker-002",
json.dumps({"status": "idle", "load": 0.0, "capabilities": ["text.generation"]}).encode(),
timeout=3
)
await asyncio.sleep(0.5)
await nc.close()
# ============================================================
# 场景三:任务广播,多个 Worker 竞争消费
# ============================================================
async def scenario3_task_broadcast_compete():
nc = await nats.connect(SERVER)
# 创建公开任务邮箱
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "task.queue"}).encode(),
timeout=3
)
for i in range(1, 4):
worker_id = i
async def on_task(msg, wid=worker_id):
print(f"[Worker-{wid}] 抢到任务: {msg.data.decode()}")
# 加入 queue group,同一组内竞争消费
await nc.subscribe("$mq9.AI.MAILBOX.MSG.task.queue", queue="task.workers", cb=on_task)
await asyncio.sleep(0.2)
# 主 Agent 投递任务
for i in range(1, 4):
await nc.request(
"$mq9.AI.MAILBOX.MSG.task.queue",
json.dumps({"task_id": f"t-{i:03d}", "type": "data_analysis"}).encode(),
timeout=3
)
await asyncio.sleep(0.5)
await nc.close()
# ============================================================
# 场景四:Agent 发现异常,广播告警
# ============================================================
async def scenario4_anomaly_broadcast():
nc = await nats.connect(SERVER)
# 创建公开告警邮箱
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "alert.payment"}).encode(),
timeout=3
)
async def on_alert_a(msg):
print(f"[告警处理A] {msg.data.decode()}")
async def on_alert_b(msg):
print(f"[告警处理B] {msg.data.decode()}")
# 两个处理方各自订阅告警邮箱(各自独立收到同一条消息)
await nc.subscribe("$mq9.AI.MAILBOX.MSG.alert.payment", cb=on_alert_a)
await nc.subscribe("$mq9.AI.MAILBOX.MSG.alert.payment", cb=on_alert_b)
await asyncio.sleep(0.2)
# 监控 Agent 发 critical 告警
await nc.request(
"$mq9.AI.MAILBOX.MSG.alert.payment.critical",
json.dumps({"level": "critical", "detail": "支付超时率突增到15%"}).encode(),
timeout=3
)
await asyncio.sleep(0.5)
await nc.close()
# ============================================================
# 场景五:云端给离线边缘 Agent 发指令
# ============================================================
async def scenario5_edge_offline_message():
nc = await nats.connect(SERVER)
# 边缘 Agent 注册邮箱(上线时创建)
reply = await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"ttl": 3600, "prefix": "edge"}).encode(),
timeout=3
)
edge_inbox = json.loads(reply.data)["mail_address"]
print(f"[边缘设备] 邮箱: {edge_inbox}")
# 云端发送三级指令(边缘模拟离线,消息持久化存储)
await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}.critical",
json.dumps({"command": "emergency_stop"}).encode(), timeout=3)
await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}.urgent",
json.dumps({"command": "update_config", "interval": 30}).encode(), timeout=3)
await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}",
json.dumps({"command": "report_status"}).encode(), timeout=3)
print("[云端] 指令已发送,等待边缘设备上线...")
# 边缘 Agent "上线",订阅邮箱,按优先级收取
async def on_command(msg):
print(f"[边缘设备] 收到指令: {msg.data.decode()}")
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}", cb=on_command)
# 兜底:LIST 查看邮箱内容
list_reply = await nc.request(f"$mq9.AI.MAILBOX.LIST.{edge_inbox}", b"", timeout=3)
print(f"[边缘设备] LIST 兜底: {list_reply.data.decode()}")
await asyncio.sleep(1)
await nc.close()
# ============================================================
# 场景六:人机混合工作流
# ============================================================
async def scenario6_human_approval():
nc = await nats.connect(SERVER)
# 审批员创建公开邮箱
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "approver.zhang"}).encode(),
timeout=3
)
# Agent 创建私有邮箱
reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
agent_inbox = json.loads(reply.data)["mail_address"]
approved = asyncio.Event()
async def on_approval_result(msg):
result = json.loads(msg.data)
print(f"[Agent] 收到审批结果: approved={result['approved']}")
approved.set()
async def on_approval_request(msg):
req = json.loads(msg.data)
print(f"[审批员] 收到请求: {req['content']}")
# 审批通过,回复 Agent
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
json.dumps({"approved": True, "correlation_id": req["correlation_id"]}).encode(),
timeout=3
)
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.approver.zhang", cb=on_approval_request)
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{agent_inbox}", cb=on_approval_result)
await asyncio.sleep(0.2)
# Agent 发起审批请求
await nc.request(
"$mq9.AI.MAILBOX.MSG.approver.zhang.urgent",
json.dumps({
"type": "approval_request",
"content": "申请调用外部API,预计费用$50",
"reply_to": agent_inbox,
"correlation_id": "approval-001"
}).encode(),
timeout=3
)
await asyncio.wait_for(approved.wait(), timeout=5)
await nc.close()
# ============================================================
# 场景七:Agent A 向 Agent B 提问,B 可能不在线
# ============================================================
async def scenario7_async_request_reply():
nc = await nats.connect(SERVER)
# A 和 B 各自创建邮箱
reply_a = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
inbox_a = json.loads(reply_a.data)["mail_address"]
reply_b = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":3600}', timeout=3)
inbox_b = json.loads(reply_b.data)["mail_address"]
answered = asyncio.Event()
async def on_question(msg):
req = json.loads(msg.data)
print(f"[Agent-B] 收到提问: {req['question']}")
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
json.dumps({"answer": "当前进度72%", "correlation_id": req["correlation_id"]}).encode(),
timeout=3
)
async def on_answer(msg):
print(f"[Agent-A] 收到回复: {msg.data.decode()}")
answered.set()
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{inbox_b}", cb=on_question)
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{inbox_a}", cb=on_answer)
await asyncio.sleep(0.2)
# A 向 B 提问
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{inbox_b}",
json.dumps({
"question": "当前进度多少?",
"reply_to": inbox_a,
"correlation_id": "req-001"
}).encode(),
timeout=3
)
await asyncio.wait_for(answered.wait(), timeout=5)
await nc.close()
# ============================================================
# 场景八:Agent 注册能力,其他 Agent 发现并调用
# ============================================================
async def scenario8_capability_discovery():
nc = await nats.connect(SERVER)
# 创建公开查询邮箱
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "capability.query"}).encode(),
timeout=3
)
# 编排器创建私有邮箱
reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
orchestrator_inbox = json.loads(reply.data)["mail_address"]
# Worker-001 注册能力(公开邮箱 + 发布能力声明)
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "capability.worker-001", "ttl": 60}).encode(),
timeout=3
)
await nc.request(
"$mq9.AI.MAILBOX.MSG.capability.worker-001",
json.dumps({"capabilities": ["data.analysis", "report.generation"], "load": 0.3}).encode(),
timeout=3
)
async def on_query(msg):
req = json.loads(msg.data)
if "data.analysis" in req.get("need", ""):
print("[Worker-001] 响应能力查询")
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
json.dumps({
"from": "capability.worker-001",
"capabilities": ["data.analysis", "report.generation"],
"load": 0.3
}).encode(),
timeout=3
)
async def on_response(msg):
print(f"[编排器] 发现有能力的 Agent: {msg.data.decode()}")
await nc.subscribe("$mq9.AI.MAILBOX.MSG.capability.query", cb=on_query)
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{orchestrator_inbox}", cb=on_response)
await asyncio.sleep(0.2)
# 编排器广播查询
print("[编排器] 查询: 谁有 data.analysis 能力?")
await nc.request(
"$mq9.AI.MAILBOX.MSG.capability.query",
json.dumps({"need": "data.analysis", "reply_to": orchestrator_inbox}).encode(),
timeout=3
)
await asyncio.sleep(0.5)
await nc.close()
# ============================================================
# main
# ============================================================
async def main():
print("=== 场景一:异步任务回传 ===")
await scenario1_async_task_return()
print("\n=== 场景二:全局状态感知 ===")
await scenario2_global_status_awareness()
print("\n=== 场景三:任务广播与竞争消费 ===")
await scenario3_task_broadcast_compete()
print("\n=== 场景四:异常告警广播 ===")
await scenario4_anomaly_broadcast()
print("\n=== 场景五:边缘离线消息积压 ===")
await scenario5_edge_offline_message()
print("\n=== 场景六:人机混合工作流 ===")
await scenario6_human_approval()
print("\n=== 场景七:异步请求-响应 ===")
await scenario7_async_request_reply()
print("\n=== 场景八:能力注册与发现 ===")
await scenario8_capability_discovery()
asyncio.run(main())