mq9 vs Existing Solutions: Eight Scenarios Side by Side
Agent-to-agent communication has no standard answer today. Everyone is working around it with whatever is at hand — HTTP, Redis, Kafka, home-grown queues. It works, but every scenario requires adding something extra.
This article takes the eight core scenarios mq9 is designed to address and compares them one by one: how existing solutions handle them, where the pain is, how mq9 handles them, and how big the advantage is. No hype, just facts.
Protocol basics: mq9 has exactly four commands:
MAILBOX.CREATE,MAILBOX.MSG.{mail_address},MAILBOX.LIST.{mail_address}, andMAILBOX.DELETE.{mail_address}.{msg_id}. Subscriptions useMAILBOX.MSG.{mail_address}with an optional queue group. All operations require the precise mail_address — wildcard cross-mailbox subscriptions are not allowed.
Scenario 1: Sub-Agent Completes Task, Notifies Orchestrator Asynchronously
HTTP: The sub-agent calls back the orchestrator's webhook. The orchestrator must be online and expose an endpoint. On restart or network blip, the callback is lost — retry logic, idempotency, and timeout handling all have to be built by hand, every project.
Redis: Sub-agent publishes to a channel; orchestrator subscribes. If the orchestrator is offline at that moment, the message is gone. For reliable delivery you have to add Redis Streams or a List, essentially hand-rolling a message queue on top of Redis.
Kafka: Create a topic, sub-agent produces, orchestrator consumes. It works, but you need to pre-create the topic, configure partitions and retention, and manage consumer groups and offsets. A simple task result triggers a full Kafka operations story.
mq9: The orchestrator calls MAILBOX.CREATE to get a mail_address; the sub-agent uses MAILBOX.MSG.{mail_address} to send the result. If the orchestrator is online it receives immediately; if offline the message is persisted and pushed on reconnect. MAILBOX.LIST can be used as a safety net to confirm nothing was missed.
# Orchestrator creates its mailbox
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":3600}'
# → {"mail_address":"abc123ef"}
# Sub-agent sends result (normal priority)
nats request '$mq9.AI.MAILBOX.MSG.abc123ef' '{"task_id":"t-001","result":"analysis complete"}'
# → {"msg_id":0}
# Orchestrator subscribes (including unread history)
nats subscribe '$mq9.AI.MAILBOX.MSG.abc123ef'
# Safety net: pull mailbox contents proactively
nats request '$mq9.AI.MAILBOX.LIST.abc123ef' ''Advantage: Medium. No requirement for the other side to be online vs HTTP; native persistence vs Redis; zero ops configuration vs Kafka.
Scenario 2: Orchestrator Monitors All Worker Agent Status
HTTP polling: The orchestrator periodically requests each sub-agent's /health endpoint. Requires maintaining an agent list — register on start, deregister on exit. N agents means N requests, O(n) overhead. Too long an interval means slow detection; too short wastes resources.
Redis: Each agent writes a key with TTL; the orchestrator uses SCAN to iterate. O(n) operation, degrades with many agents. Redis has no reliable native "subscribe to key changes" — polling only.
Kafka: Each agent writes its state to a compacted topic. Works, but compaction has latency. Every write is a persisted log entry — full persistence overhead just to track the latest state.
mq9: Each worker creates a public mailbox on startup with a well-known name (e.g., status.worker001), reports status periodically via MAILBOX.MSG, and sets a TTL for its mailbox lifetime. The orchestrator subscribes to each known worker's mailbox. When a worker stops updating, the TTL expiry removes its mailbox automatically — no explicit deregistration needed.
# Worker-001 starts, registers public status mailbox
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":60,"public":true,"name":"status.worker001","desc":"worker status"}'
# Worker-001 reports status periodically (every 30s)
nats request '$mq9.AI.MAILBOX.MSG.status.worker001' '{"status":"running","load":0.3}'
# Orchestrator subscribes to each known worker's mailbox
nats subscribe '$mq9.AI.MAILBOX.MSG.status.worker001'
nats subscribe '$mq9.AI.MAILBOX.MSG.status.worker002'Advantage: High. Zero polling; TTL automatically detects worker exit; new workers register with a known name and the orchestrator subscribes on demand.
Scenario 3: Task Fan-Out, Multiple Workers Compete to Consume
HTTP: The orchestrator must implement load balancing, maintain a worker list, pick one, and send. Worker failure means retry and failover. Writing your own scheduler and fault-tolerance logic every time.
Redis: Orchestrator does LPUSH to a List; workers use BRPOP to compete. Most common approach, but no acknowledgement — a worker that BRPOPs a task and crashes loses it. Need to build a reliable queue on top. No priority, no fan-out capability.
Kafka: Publish to a topic; multiple consumers in the same group compete. Works, but partition count sets the parallelism ceiling; scaling workers dynamically requires rebalancing; rebalancing causes a consumption pause.
mq9: Create a public task mailbox (e.g., task.queue); the orchestrator delivers tasks via MAILBOX.MSG; workers subscribe with a queue group to compete — each message goes to exactly one worker. Workers can join or leave dynamically with no configuration change.
# Orchestrator creates public task mailbox (one-time)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"task.queue","desc":"task queue"}'
# Orchestrator delivers tasks
nats request '$mq9.AI.MAILBOX.MSG.task.queue' '{"task_id":"t-001","type":"data_analysis"}'
nats request '$mq9.AI.MAILBOX.MSG.task.queue' '{"task_id":"t-002","type":"data_analysis"}'
# Workers compete using a queue group
nats subscribe '$mq9.AI.MAILBOX.MSG.task.queue' --queue task.workers
# Worker-1, Worker-2, Worker-3 each run the above; each message goes to exactly oneAdvantage: Medium. No worker list to maintain vs HTTP; no BRPOP loss risk vs Redis; no rebalance vs Kafka. Note: mq9 is early-stage — production stability needs validation.
Scenario 4: Agent Detects Anomaly, Broadcasts Alert
HTTP: The detecting agent needs to know who can handle it, maintain a subscriber list, and POST to each one individually. Maintaining that list is itself a distributed systems problem.
Redis: PUBLISH to a channel; all subscribers receive. Works, but anyone offline at that moment misses it — no persistence. An alert fires the instant a handler restarts — the alert is lost.
Kafka: Publish to a topic; each handler consumes with its own consumer group. Fan-out works, but each handler needs its own group configured and the topic must be pre-created.
mq9: Create a public alert mailbox (e.g., alert.payment); the monitoring agent publishes alerts there; all interested agents subscribe independently. Messages are persisted — critical alerts are not missed on reconnect. The publisher does not need to know who is listening.
# Create public alert mailbox (one-time)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"alert.payment","desc":"payment domain alerts"}'
# Monitoring agent publishes critical alert
nats request '$mq9.AI.MAILBOX.MSG.alert.payment.critical' '{"level":"critical","detail":"payment timeout rate spiked to 15%"}'
# Alert handler A subscribes
nats subscribe '$mq9.AI.MAILBOX.MSG.alert.payment'
# Alert handler B also subscribes (same mailbox, each receives independently)
nats subscribe '$mq9.AI.MAILBOX.MSG.alert.payment'Advantage: Medium. No subscriber list to maintain vs HTTP; persistent delivery vs Redis; no pre-configured consumer groups vs Kafka. Each handler needs to know the alert mailbox name (fixed by convention).
Scenario 5: Cloud Sends Commands to Offline Edge Agent
HTTP: Fails directly. Edge is offline, request times out. You have to build a retry queue, exponential backoff, and command persistence — essentially hand-rolling a message queue in the cloud.
Redis: Write commands to a List; edge consumes on reconnect. No priority — emergency and routine commands queue up together. To add priority you need Sorted Set, increasing complexity. Redis is in-memory; large backlogs are expensive.
Kafka: Write to a topic; edge consumes when it comes online. Works, but no native priority — strict FIFO within a partition. Urgent commands need a separate topic; the consumer must subscribe to multiple topics and do its own priority scheduling.
mq9: Each edge agent has its own mail_address (created on startup, communicated to the cloud). The cloud uses MAILBOX.MSG.{mail_address}.critical for emergency commands, .urgent for important commands, no suffix for normal commands. When the edge reconnects and subscribes to its mailbox, the server pushes messages in critical → urgent → normal order. MAILBOX.LIST is available as a safety net.
# Cloud: send priority-graded commands (edge may be offline — messages are persisted)
nats request '$mq9.AI.MAILBOX.MSG.edge.device.critical' '{"command":"emergency_stop","reason":"temperature too high"}'
nats request '$mq9.AI.MAILBOX.MSG.edge.device.urgent' '{"command":"update_config","interval":30}'
nats request '$mq9.AI.MAILBOX.MSG.edge.device' '{"command":"report_status"}'
# Edge agent reconnects and subscribes to its mailbox (pushed in priority order)
nats subscribe '$mq9.AI.MAILBOX.MSG.edge.device'
# Safety net: proactively pull to check for missed messages
nats request '$mq9.AI.MAILBOX.LIST.edge.device' ''Advantage: High. No requirement for the other side to be online vs HTTP; three-level priority with low-cost persistence (RocksDB) vs Redis; native priority without multiple topics vs Kafka.
Scenario 6: Human-in-the-Loop Approval Workflow
HTTP + frontend: Agent calls an approval service API to create a ticket; the frontend polls or uses WebSocket to push to the approver; the approver acts and triggers a callback to the agent. At least three systems wired together: Agent → approval service → frontend → approval service → Agent.
Redis: Agent writes approval request to a key; approver client polls; approver writes result to another key; agent polls for the result. Both sides polling, high latency, logic scattered everywhere.
Kafka: Approval requests go to one topic; approval results go to another. Works, but the approver needs a Kafka client — unfriendly for non-technical users.
mq9: The approver has their own mail_address (a well-known public mailbox such as approver.zhang). The agent sends the approval request to MAILBOX.MSG.{approver_mail_address}.urgent; the approver's client subscribes to their own mailbox; after approval they reply via MAILBOX.MSG.{agent_mail_address}. Humans and agents use the exact same protocol — no separate approval service needed. A correlation_id in the payload ensures correct pairing.
# Approver's mailbox (pre-created public mailbox, name is the address)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"approver.zhang","desc":"approver mailbox"}'
# Agent sends approval request (urgent priority)
nats request '$mq9.AI.MAILBOX.MSG.approver.zhang.urgent' '{
"from":"agent",
"type":"approval_request",
"correlation_id":"approval-001",
"content":"requesting external API call, estimated cost $50"
}'
# Approver client subscribes to own mailbox
nats subscribe '$mq9.AI.MAILBOX.MSG.approver.zhang'
# Approver replies to the agent's mailbox
nats request '$mq9.AI.MAILBOX.MSG.agent' '{
"from":"approver.zhang",
"approved":true,
"correlation_id":"approval-001"
}'
# Agent subscribes to its own mailbox to receive the approval result
nats subscribe '$mq9.AI.MAILBOX.MSG.agent'Advantage: High. Humans and agents share one protocol; no separate approval service; messages are persisted — approver going offline loses nothing.
Scenario 7: Agent A Asks Agent B a Question — B May Be Offline
HTTP: B is offline → immediate failure. For async, you need a callback URL, correlation_id, retry logic, and a state machine — complexity explodes.
Redis: A writes the request to B's List; B writes the result to A's List. Two Lists, both sides polling, correlation_id matching by hand. Works, but non-trivial code.
Kafka: A sends request to a request topic with a reply_to header; B produces the response to a reply topic. Two topics, two consumer groups; A must filter the reply topic for responses addressed to itself.
mq9: A creates its own private mailbox; sends the question to MAILBOX.MSG.{B_mail_address} with reply_to (A's mail_address) and correlation_id in the payload. B subscribes to its mailbox when online, processes the question, and replies via MAILBOX.MSG.{reply_to}. While B is offline the request is persisted in B's mailbox, pushed when B reconnects.
# A creates its mailbox to receive the reply
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":300}'
# → {"mail_address":"agent.a"}
# A asks B a question with reply_to
nats request '$mq9.AI.MAILBOX.MSG.agent.b' '{
"from":"agent.a",
"question":"what is the current progress?",
"reply_to":"agent.a",
"correlation_id":"req-001"
}'
# B subscribes to its mailbox (runs when B comes online)
nats subscribe '$mq9.AI.MAILBOX.MSG.agent.b'
# B replies to reply_to after processing
nats request '$mq9.AI.MAILBOX.MSG.agent.a' '{
"from":"agent.b",
"answer":"current progress is 72%",
"correlation_id":"req-001"
}'
# A subscribes to its own mailbox waiting for the reply
nats subscribe '$mq9.AI.MAILBOX.MSG.agent.a'Advantage: Medium. B being offline does not lose messages; A does not have to wait online; fully async on both sides. No two-topic setup or reply-filtering logic vs Kafka.
Scenario 8: Agent Registers Capabilities, Others Discover and Call It
HTTP + registry: Requires a centralized service registry (Consul, Eureka, custom-built). Agent registers on start, deregisters on exit, or relies on health checks for pruning. Introduces an additional stateful service that itself needs high-availability deployment.
Redis: Agent writes capabilities to a Hash or Set with TTL; querying side uses SCAN or SMEMBERS. Works, but no mechanism to subscribe to capability changes — polling only. SCAN degrades with many agents.
Kafka: Capability declarations go to a compacted topic; querying side consumes the entire topic to build a local view. High latency, complex implementation — a full Kafka consumer chain just for simple service discovery.
mq9: Each agent creates a public mailbox with a well-known name on startup (e.g., capability.worker001) and posts a capability declaration. TTL controls registration validity — no renewal means automatic expiry. The orchestrator needing discovery creates a public query mailbox (e.g., capability.query), broadcasts its query there, and capable agents reply directly to the orchestrator's mailbox.
# Worker-001 registers capabilities (TTL=60, must renew periodically)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"capability.worker001","ttl":60}'
nats request '$mq9.AI.MAILBOX.MSG.capability.worker001' '{
"capabilities":["data.analysis","report.generation"],
"load":0.3,
"reply_to":"capability.worker001"
}'
# Orchestrator creates query mailbox (one-time)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"capability.query"}'
# Workers subscribe to query mailbox and reply when queried
nats subscribe '$mq9.AI.MAILBOX.MSG.capability.query'
# Orchestrator broadcasts: who has data.analysis capability?
nats request '$mq9.AI.MAILBOX.MSG.capability.query' '{
"from":"orchestrator",
"need":"data.analysis",
"reply_to":"orchestrator"
}'
# Orchestrator subscribes to its own mailbox to receive responses
nats subscribe '$mq9.AI.MAILBOX.MSG.orchestrator'Advantage: High. Zero extra registry; TTL automatically detects agent exit; fully decentralized.
Summary
| Scenario | HTTP | Redis | Kafka | mq9 | mq9 Advantage |
|---|---|---|---|---|---|
| Sub-agent completes task, notifies orchestrator asynchronously | Requires other side online; manual retry | No persistence; hand-roll reliable queue | Works; high ops overhead | CREATE + MSG + LIST safety net | Medium |
| Orchestrator monitors all worker status | O(n) polling | SCAN polling | Compacted topic; lag | Public mailbox (status.worker001) + TTL auto-expiry | High |
| Task fan-out, multiple workers compete | Build own scheduler | BRPOP — unreliable | Rebalance causes pause | Public mailbox (task.queue) + queue group; zero config | Medium |
| Agent detects anomaly, broadcasts alert | Maintain subscriber list | No persistence | Need multiple consumer groups | Public alert mailbox (alert.payment) + multi-subscriber | Medium |
| Cloud sends commands to offline edge agent | Immediate failure | No priority; high memory cost | No native priority | MSG three-level priority + persistence (edge.device) | High |
| Agent sends approval request; human approves and flow continues | At least three systems | Both sides polling | Approver needs Kafka client | Human and agent share one protocol, bidirectional MSG (approver.zhang) | High |
| Agent A asks Agent B a question; B may be offline | Immediate failure if offline | Two Lists + both sides polling | Two topics + filter responses | MSG + reply_to; offline-safe (agent.a, agent.b) | Medium |
| Agent registers capabilities; others discover and call | Needs registry | SCAN polling | Compacted topic; complex | Public mailbox (capability.query) + TTL auto-cleanup | High |
Eight scenarios: 4 high-advantage, 4 medium-advantage. mq9's core value is not dominating on a single dimension — it is that no scenario requires a workaround. Four commands cover everything directly, with zero extra logic.
Worth noting: mq9 is still early-stage; its ecosystem maturity is nowhere near any of the solutions above. If your use case is big-data pipelines and high-throughput stream processing, Kafka remains the optimal choice. The comparisons above are specific to agent-to-agent async communication.
Mailbox Behavior Mapping
The complete behaviors of a real-world mailbox, mapped to mq9's four commands:
| Real-world mailbox | mq9 command | Notes |
|---|---|---|
| Open a mailbox | MAILBOX.CREATE | Get a mail_address; optional TTL, public flag, prefix |
| Send a letter | MAILBOX.MSG.{mail_address}[.urgent|.critical] | Recipient doesn't need to be home; supports urgent/normal |
| Receive letters | SUB $mq9.AI.MAILBOX.MSG.{mail_address} | Online: real-time push in critical→urgent→normal order |
| Check the PO box | MAILBOX.LIST.{mail_address} | Active pull; see what messages remain in the mailbox |
| Discard a letter | MAILBOX.DELETE.{mail_address}.{msg_id} | Explicit cleanup after processing |
| Mailbox expires | TTL auto-destroy | Unused long enough — the post office cancels it automatically |
| Bulletin board | Public mailbox + multi-subscriber SUB | Create a public mailbox; anyone who knows the name can read it |
| First come, first served | Queue group | Add --queue on subscribe; each message goes to exactly one subscriber |
Python Code Examples
Eight scenarios, eight functions, all using the native NATS Python SDK (nats-py) — zero custom wrappers.
import asyncio
import json
import nats
SERVER = "nats://localhost:4222"
# ============================================================
# Scenario 1: Sub-agent completes task, notifies orchestrator asynchronously
# ============================================================
async def scenario1_async_task_return():
nc = await nats.connect(SERVER)
# Orchestrator: create private mailbox
reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":3600}', timeout=3)
data = json.loads(reply.data)
main_inbox = data["mail_address"]
print(f"[Orchestrator] mailbox: {main_inbox}")
received = asyncio.Event()
async def on_message(msg):
print(f"[Orchestrator] received result: {msg.data.decode()}")
received.set()
# Orchestrator subscribes to its own mailbox
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{main_inbox}", cb=on_message)
# Sub-agent: task complete, send result
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{main_inbox}",
json.dumps({"task_id": "t-001", "result": "analysis complete, anomaly rate 2.3%"}).encode(),
timeout=3
)
await asyncio.wait_for(received.wait(), timeout=5)
await nc.close()
# ============================================================
# Scenario 2: Orchestrator monitors all worker status
# ============================================================
async def scenario2_global_status_awareness():
nc = await nats.connect(SERVER)
# Each worker creates a public status mailbox (TTL=60, renew periodically)
for i in range(1, 3):
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": f"status.worker{i:03d}", "ttl": 60}).encode(),
timeout=3
)
async def on_status(msg):
print(f"[Orchestrator] status update: {msg.subject} -> {msg.data.decode()}")
# Orchestrator subscribes to known workers' status mailboxes
await nc.subscribe("$mq9.AI.MAILBOX.MSG.status.worker001", cb=on_status)
await nc.subscribe("$mq9.AI.MAILBOX.MSG.status.worker002", cb=on_status)
# Worker-001 reports status
await nc.request(
"$mq9.AI.MAILBOX.MSG.status.worker001",
json.dumps({"status": "running", "load": 0.3, "capabilities": ["data.analysis"]}).encode(),
timeout=3
)
# Worker-002 reports status
await nc.request(
"$mq9.AI.MAILBOX.MSG.status.worker002",
json.dumps({"status": "idle", "load": 0.0, "capabilities": ["text.generation"]}).encode(),
timeout=3
)
await asyncio.sleep(0.5)
await nc.close()
# ============================================================
# Scenario 3: Task fan-out, multiple workers compete to consume
# ============================================================
async def scenario3_task_broadcast_compete():
nc = await nats.connect(SERVER)
# Create public task mailbox
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "task.queue"}).encode(),
timeout=3
)
for i in range(1, 4):
worker_id = i
async def on_task(msg, wid=worker_id):
print(f"[Worker-{wid}] claimed task: {msg.data.decode()}")
# Join queue group — compete within the same group
await nc.subscribe("$mq9.AI.MAILBOX.MSG.task.queue", queue="task.workers", cb=on_task)
await asyncio.sleep(0.2)
# Orchestrator delivers tasks
for i in range(1, 4):
await nc.request(
"$mq9.AI.MAILBOX.MSG.task.queue",
json.dumps({"task_id": f"t-{i:03d}", "type": "data_analysis"}).encode(),
timeout=3
)
await asyncio.sleep(0.5)
await nc.close()
# ============================================================
# Scenario 4: Agent detects anomaly, broadcasts alert
# ============================================================
async def scenario4_anomaly_broadcast():
nc = await nats.connect(SERVER)
# Create public alert mailbox
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "alert.payment"}).encode(),
timeout=3
)
async def on_alert_a(msg):
print(f"[Handler-A] {msg.data.decode()}")
async def on_alert_b(msg):
print(f"[Handler-B] {msg.data.decode()}")
# Two handlers each subscribe independently — both receive every message
await nc.subscribe("$mq9.AI.MAILBOX.MSG.alert.payment", cb=on_alert_a)
await nc.subscribe("$mq9.AI.MAILBOX.MSG.alert.payment", cb=on_alert_b)
await asyncio.sleep(0.2)
# Monitoring agent publishes critical alert
await nc.request(
"$mq9.AI.MAILBOX.MSG.alert.payment.critical",
json.dumps({"level": "critical", "detail": "payment timeout rate spiked to 15%"}).encode(),
timeout=3
)
await asyncio.sleep(0.5)
await nc.close()
# ============================================================
# Scenario 5: Cloud sends commands to offline edge agent
# ============================================================
async def scenario5_edge_offline_message():
nc = await nats.connect(SERVER)
# Edge agent registers mailbox on startup
reply = await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"ttl": 3600, "prefix": "edge"}).encode(),
timeout=3
)
edge_inbox = json.loads(reply.data)["mail_address"]
print(f"[Edge] mailbox: {edge_inbox}")
# Cloud sends three priority levels (edge is simulated offline — messages are persisted)
await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}.critical",
json.dumps({"command": "emergency_stop"}).encode(), timeout=3)
await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}.urgent",
json.dumps({"command": "update_config", "interval": 30}).encode(), timeout=3)
await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}",
json.dumps({"command": "report_status"}).encode(), timeout=3)
print("[Cloud] commands sent, waiting for edge to come online...")
# Edge agent "comes online" — subscribes and receives in priority order
async def on_command(msg):
print(f"[Edge] received command: {msg.data.decode()}")
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}", cb=on_command)
# Safety net: LIST to verify nothing was missed
list_reply = await nc.request(f"$mq9.AI.MAILBOX.LIST.{edge_inbox}", b"", timeout=3)
print(f"[Edge] LIST safety net: {list_reply.data.decode()}")
await asyncio.sleep(1)
await nc.close()
# ============================================================
# Scenario 6: Human-in-the-loop approval workflow
# ============================================================
async def scenario6_human_approval():
nc = await nats.connect(SERVER)
# Approver creates public mailbox
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "approver.zhang"}).encode(),
timeout=3
)
# Agent creates private mailbox
reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
agent_inbox = json.loads(reply.data)["mail_address"]
approved = asyncio.Event()
async def on_approval_result(msg):
result = json.loads(msg.data)
print(f"[Agent] received approval result: approved={result['approved']}")
approved.set()
async def on_approval_request(msg):
req = json.loads(msg.data)
print(f"[Approver] received request: {req['content']}")
# Approve and reply to agent
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
json.dumps({"approved": True, "correlation_id": req["correlation_id"]}).encode(),
timeout=3
)
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.approver.zhang", cb=on_approval_request)
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{agent_inbox}", cb=on_approval_result)
await asyncio.sleep(0.2)
# Agent sends approval request
await nc.request(
"$mq9.AI.MAILBOX.MSG.approver.zhang.urgent",
json.dumps({
"type": "approval_request",
"content": "requesting external API call, estimated cost $50",
"reply_to": agent_inbox,
"correlation_id": "approval-001"
}).encode(),
timeout=3
)
await asyncio.wait_for(approved.wait(), timeout=5)
await nc.close()
# ============================================================
# Scenario 7: Agent A asks Agent B a question — B may be offline
# ============================================================
async def scenario7_async_request_reply():
nc = await nats.connect(SERVER)
# A and B each create their own mailbox
reply_a = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
inbox_a = json.loads(reply_a.data)["mail_address"]
reply_b = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":3600}', timeout=3)
inbox_b = json.loads(reply_b.data)["mail_address"]
answered = asyncio.Event()
async def on_question(msg):
req = json.loads(msg.data)
print(f"[Agent-B] received question: {req['question']}")
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
json.dumps({"answer": "current progress is 72%", "correlation_id": req["correlation_id"]}).encode(),
timeout=3
)
async def on_answer(msg):
print(f"[Agent-A] received reply: {msg.data.decode()}")
answered.set()
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{inbox_b}", cb=on_question)
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{inbox_a}", cb=on_answer)
await asyncio.sleep(0.2)
# A asks B a question
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{inbox_b}",
json.dumps({
"question": "what is the current progress?",
"reply_to": inbox_a,
"correlation_id": "req-001"
}).encode(),
timeout=3
)
await asyncio.wait_for(answered.wait(), timeout=5)
await nc.close()
# ============================================================
# Scenario 8: Agent registers capabilities; others discover and call it
# ============================================================
async def scenario8_capability_discovery():
nc = await nats.connect(SERVER)
# Create public query mailbox
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "capability.query"}).encode(),
timeout=3
)
# Orchestrator creates private mailbox
reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
orchestrator_inbox = json.loads(reply.data)["mail_address"]
# Worker-001 registers capabilities (public mailbox + capability declaration)
await nc.request(
"$mq9.AI.MAILBOX.CREATE",
json.dumps({"public": True, "name": "capability.worker001", "ttl": 60}).encode(),
timeout=3
)
await nc.request(
"$mq9.AI.MAILBOX.MSG.capability.worker001",
json.dumps({"capabilities": ["data.analysis", "report.generation"], "load": 0.3}).encode(),
timeout=3
)
async def on_query(msg):
req = json.loads(msg.data)
if "data.analysis" in req.get("need", ""):
print("[Worker-001] responding to capability query")
await nc.request(
f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
json.dumps({
"from": "capability.worker001",
"capabilities": ["data.analysis", "report.generation"],
"load": 0.3
}).encode(),
timeout=3
)
async def on_response(msg):
print(f"[Orchestrator] found capable agent: {msg.data.decode()}")
await nc.subscribe("$mq9.AI.MAILBOX.MSG.capability.query", cb=on_query)
await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{orchestrator_inbox}", cb=on_response)
await asyncio.sleep(0.2)
# Orchestrator broadcasts query
print("[Orchestrator] querying: who has data.analysis capability?")
await nc.request(
"$mq9.AI.MAILBOX.MSG.capability.query",
json.dumps({"need": "data.analysis", "reply_to": orchestrator_inbox}).encode(),
timeout=3
)
await asyncio.sleep(0.5)
await nc.close()
# ============================================================
# main
# ============================================================
async def main():
print("=== Scenario 1: Async Task Result Return ===")
await scenario1_async_task_return()
print("\n=== Scenario 2: Global Status Awareness ===")
await scenario2_global_status_awareness()
print("\n=== Scenario 3: Task Fan-out and Competing Consumers ===")
await scenario3_task_broadcast_compete()
print("\n=== Scenario 4: Anomaly Alert Broadcast ===")
await scenario4_anomaly_broadcast()
print("\n=== Scenario 5: Edge Offline Message Backlog ===")
await scenario5_edge_offline_message()
print("\n=== Scenario 6: Human-in-the-Loop Workflow ===")
await scenario6_human_approval()
print("\n=== Scenario 7: Async Request-Reply ===")
await scenario7_async_request_reply()
print("\n=== Scenario 8: Capability Registration and Discovery ===")
await scenario8_capability_discovery()
asyncio.run(main())