Skip to content

mq9 vs Existing Solutions: Eight Scenarios Side by Side

Agent-to-agent communication has no standard answer today. Everyone is working around it with whatever is at hand — HTTP, Redis, Kafka, home-grown queues. It works, but every scenario requires adding something extra.

This article takes the eight core scenarios mq9 is designed to address and compares them one by one: how existing solutions handle them, where the pain is, how mq9 handles them, and how big the advantage is. No hype, just facts.

Protocol basics: mq9 has exactly four commands: MAILBOX.CREATE, MAILBOX.MSG.{mail_address}, MAILBOX.LIST.{mail_address}, and MAILBOX.DELETE.{mail_address}.{msg_id}. Subscriptions use MAILBOX.MSG.{mail_address} with an optional queue group. All operations require the precise mail_address — wildcard cross-mailbox subscriptions are not allowed.


Scenario 1: Sub-Agent Completes Task, Notifies Orchestrator Asynchronously

HTTP: The sub-agent calls back the orchestrator's webhook. The orchestrator must be online and expose an endpoint. On restart or network blip, the callback is lost — retry logic, idempotency, and timeout handling all have to be built by hand, every project.

Redis: Sub-agent publishes to a channel; orchestrator subscribes. If the orchestrator is offline at that moment, the message is gone. For reliable delivery you have to add Redis Streams or a List, essentially hand-rolling a message queue on top of Redis.

Kafka: Create a topic, sub-agent produces, orchestrator consumes. It works, but you need to pre-create the topic, configure partitions and retention, and manage consumer groups and offsets. A simple task result triggers a full Kafka operations story.

mq9: The orchestrator calls MAILBOX.CREATE to get a mail_address; the sub-agent uses MAILBOX.MSG.{mail_address} to send the result. If the orchestrator is online it receives immediately; if offline the message is persisted and pushed on reconnect. MAILBOX.LIST can be used as a safety net to confirm nothing was missed.

bash
# Orchestrator creates its mailbox
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":3600}'
# → {"mail_address":"abc123ef"}

# Sub-agent sends result (normal priority)
nats request '$mq9.AI.MAILBOX.MSG.abc123ef' '{"task_id":"t-001","result":"analysis complete"}'
# → {"msg_id":0}

# Orchestrator subscribes (including unread history)
nats subscribe '$mq9.AI.MAILBOX.MSG.abc123ef'

# Safety net: pull mailbox contents proactively
nats request '$mq9.AI.MAILBOX.LIST.abc123ef' ''

Advantage: Medium. No requirement for the other side to be online vs HTTP; native persistence vs Redis; zero ops configuration vs Kafka.


Scenario 2: Orchestrator Monitors All Worker Agent Status

HTTP polling: The orchestrator periodically requests each sub-agent's /health endpoint. Requires maintaining an agent list — register on start, deregister on exit. N agents means N requests, O(n) overhead. Too long an interval means slow detection; too short wastes resources.

Redis: Each agent writes a key with TTL; the orchestrator uses SCAN to iterate. O(n) operation, degrades with many agents. Redis has no reliable native "subscribe to key changes" — polling only.

Kafka: Each agent writes its state to a compacted topic. Works, but compaction has latency. Every write is a persisted log entry — full persistence overhead just to track the latest state.

mq9: Each worker creates a public mailbox on startup with a well-known name (e.g., status.worker001), reports status periodically via MAILBOX.MSG, and sets a TTL for its mailbox lifetime. The orchestrator subscribes to each known worker's mailbox. When a worker stops updating, the TTL expiry removes its mailbox automatically — no explicit deregistration needed.

bash
# Worker-001 starts, registers public status mailbox
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":60,"public":true,"name":"status.worker001","desc":"worker status"}'

# Worker-001 reports status periodically (every 30s)
nats request '$mq9.AI.MAILBOX.MSG.status.worker001' '{"status":"running","load":0.3}'

# Orchestrator subscribes to each known worker's mailbox
nats subscribe '$mq9.AI.MAILBOX.MSG.status.worker001'
nats subscribe '$mq9.AI.MAILBOX.MSG.status.worker002'

Advantage: High. Zero polling; TTL automatically detects worker exit; new workers register with a known name and the orchestrator subscribes on demand.


Scenario 3: Task Fan-Out, Multiple Workers Compete to Consume

HTTP: The orchestrator must implement load balancing, maintain a worker list, pick one, and send. Worker failure means retry and failover. Writing your own scheduler and fault-tolerance logic every time.

Redis: Orchestrator does LPUSH to a List; workers use BRPOP to compete. Most common approach, but no acknowledgement — a worker that BRPOPs a task and crashes loses it. Need to build a reliable queue on top. No priority, no fan-out capability.

Kafka: Publish to a topic; multiple consumers in the same group compete. Works, but partition count sets the parallelism ceiling; scaling workers dynamically requires rebalancing; rebalancing causes a consumption pause.

mq9: Create a public task mailbox (e.g., task.queue); the orchestrator delivers tasks via MAILBOX.MSG; workers subscribe with a queue group to compete — each message goes to exactly one worker. Workers can join or leave dynamically with no configuration change.

bash
# Orchestrator creates public task mailbox (one-time)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"task.queue","desc":"task queue"}'

# Orchestrator delivers tasks
nats request '$mq9.AI.MAILBOX.MSG.task.queue' '{"task_id":"t-001","type":"data_analysis"}'
nats request '$mq9.AI.MAILBOX.MSG.task.queue' '{"task_id":"t-002","type":"data_analysis"}'

# Workers compete using a queue group
nats subscribe '$mq9.AI.MAILBOX.MSG.task.queue' --queue task.workers
# Worker-1, Worker-2, Worker-3 each run the above; each message goes to exactly one

Advantage: Medium. No worker list to maintain vs HTTP; no BRPOP loss risk vs Redis; no rebalance vs Kafka. Note: mq9 is early-stage — production stability needs validation.


Scenario 4: Agent Detects Anomaly, Broadcasts Alert

HTTP: The detecting agent needs to know who can handle it, maintain a subscriber list, and POST to each one individually. Maintaining that list is itself a distributed systems problem.

Redis: PUBLISH to a channel; all subscribers receive. Works, but anyone offline at that moment misses it — no persistence. An alert fires the instant a handler restarts — the alert is lost.

Kafka: Publish to a topic; each handler consumes with its own consumer group. Fan-out works, but each handler needs its own group configured and the topic must be pre-created.

mq9: Create a public alert mailbox (e.g., alert.payment); the monitoring agent publishes alerts there; all interested agents subscribe independently. Messages are persisted — critical alerts are not missed on reconnect. The publisher does not need to know who is listening.

bash
# Create public alert mailbox (one-time)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"alert.payment","desc":"payment domain alerts"}'

# Monitoring agent publishes critical alert
nats request '$mq9.AI.MAILBOX.MSG.alert.payment.critical' '{"level":"critical","detail":"payment timeout rate spiked to 15%"}'

# Alert handler A subscribes
nats subscribe '$mq9.AI.MAILBOX.MSG.alert.payment'

# Alert handler B also subscribes (same mailbox, each receives independently)
nats subscribe '$mq9.AI.MAILBOX.MSG.alert.payment'

Advantage: Medium. No subscriber list to maintain vs HTTP; persistent delivery vs Redis; no pre-configured consumer groups vs Kafka. Each handler needs to know the alert mailbox name (fixed by convention).


Scenario 5: Cloud Sends Commands to Offline Edge Agent

HTTP: Fails directly. Edge is offline, request times out. You have to build a retry queue, exponential backoff, and command persistence — essentially hand-rolling a message queue in the cloud.

Redis: Write commands to a List; edge consumes on reconnect. No priority — emergency and routine commands queue up together. To add priority you need Sorted Set, increasing complexity. Redis is in-memory; large backlogs are expensive.

Kafka: Write to a topic; edge consumes when it comes online. Works, but no native priority — strict FIFO within a partition. Urgent commands need a separate topic; the consumer must subscribe to multiple topics and do its own priority scheduling.

mq9: Each edge agent has its own mail_address (created on startup, communicated to the cloud). The cloud uses MAILBOX.MSG.{mail_address}.critical for emergency commands, .urgent for important commands, no suffix for normal commands. When the edge reconnects and subscribes to its mailbox, the server pushes messages in critical → urgent → normal order. MAILBOX.LIST is available as a safety net.

bash
# Cloud: send priority-graded commands (edge may be offline — messages are persisted)
nats request '$mq9.AI.MAILBOX.MSG.edge.device.critical' '{"command":"emergency_stop","reason":"temperature too high"}'
nats request '$mq9.AI.MAILBOX.MSG.edge.device.urgent'   '{"command":"update_config","interval":30}'
nats request '$mq9.AI.MAILBOX.MSG.edge.device'          '{"command":"report_status"}'

# Edge agent reconnects and subscribes to its mailbox (pushed in priority order)
nats subscribe '$mq9.AI.MAILBOX.MSG.edge.device'

# Safety net: proactively pull to check for missed messages
nats request '$mq9.AI.MAILBOX.LIST.edge.device' ''

Advantage: High. No requirement for the other side to be online vs HTTP; three-level priority with low-cost persistence (RocksDB) vs Redis; native priority without multiple topics vs Kafka.


Scenario 6: Human-in-the-Loop Approval Workflow

HTTP + frontend: Agent calls an approval service API to create a ticket; the frontend polls or uses WebSocket to push to the approver; the approver acts and triggers a callback to the agent. At least three systems wired together: Agent → approval service → frontend → approval service → Agent.

Redis: Agent writes approval request to a key; approver client polls; approver writes result to another key; agent polls for the result. Both sides polling, high latency, logic scattered everywhere.

Kafka: Approval requests go to one topic; approval results go to another. Works, but the approver needs a Kafka client — unfriendly for non-technical users.

mq9: The approver has their own mail_address (a well-known public mailbox such as approver.zhang). The agent sends the approval request to MAILBOX.MSG.{approver_mail_address}.urgent; the approver's client subscribes to their own mailbox; after approval they reply via MAILBOX.MSG.{agent_mail_address}. Humans and agents use the exact same protocol — no separate approval service needed. A correlation_id in the payload ensures correct pairing.

bash
# Approver's mailbox (pre-created public mailbox, name is the address)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"approver.zhang","desc":"approver mailbox"}'

# Agent sends approval request (urgent priority)
nats request '$mq9.AI.MAILBOX.MSG.approver.zhang.urgent' '{
  "from":"agent",
  "type":"approval_request",
  "correlation_id":"approval-001",
  "content":"requesting external API call, estimated cost $50"
}'

# Approver client subscribes to own mailbox
nats subscribe '$mq9.AI.MAILBOX.MSG.approver.zhang'

# Approver replies to the agent's mailbox
nats request '$mq9.AI.MAILBOX.MSG.agent' '{
  "from":"approver.zhang",
  "approved":true,
  "correlation_id":"approval-001"
}'

# Agent subscribes to its own mailbox to receive the approval result
nats subscribe '$mq9.AI.MAILBOX.MSG.agent'

Advantage: High. Humans and agents share one protocol; no separate approval service; messages are persisted — approver going offline loses nothing.


Scenario 7: Agent A Asks Agent B a Question — B May Be Offline

HTTP: B is offline → immediate failure. For async, you need a callback URL, correlation_id, retry logic, and a state machine — complexity explodes.

Redis: A writes the request to B's List; B writes the result to A's List. Two Lists, both sides polling, correlation_id matching by hand. Works, but non-trivial code.

Kafka: A sends request to a request topic with a reply_to header; B produces the response to a reply topic. Two topics, two consumer groups; A must filter the reply topic for responses addressed to itself.

mq9: A creates its own private mailbox; sends the question to MAILBOX.MSG.{B_mail_address} with reply_to (A's mail_address) and correlation_id in the payload. B subscribes to its mailbox when online, processes the question, and replies via MAILBOX.MSG.{reply_to}. While B is offline the request is persisted in B's mailbox, pushed when B reconnects.

bash
# A creates its mailbox to receive the reply
nats request '$mq9.AI.MAILBOX.CREATE' '{"ttl":300}'
# → {"mail_address":"agent.a"}

# A asks B a question with reply_to
nats request '$mq9.AI.MAILBOX.MSG.agent.b' '{
  "from":"agent.a",
  "question":"what is the current progress?",
  "reply_to":"agent.a",
  "correlation_id":"req-001"
}'

# B subscribes to its mailbox (runs when B comes online)
nats subscribe '$mq9.AI.MAILBOX.MSG.agent.b'

# B replies to reply_to after processing
nats request '$mq9.AI.MAILBOX.MSG.agent.a' '{
  "from":"agent.b",
  "answer":"current progress is 72%",
  "correlation_id":"req-001"
}'

# A subscribes to its own mailbox waiting for the reply
nats subscribe '$mq9.AI.MAILBOX.MSG.agent.a'

Advantage: Medium. B being offline does not lose messages; A does not have to wait online; fully async on both sides. No two-topic setup or reply-filtering logic vs Kafka.


Scenario 8: Agent Registers Capabilities, Others Discover and Call It

HTTP + registry: Requires a centralized service registry (Consul, Eureka, custom-built). Agent registers on start, deregisters on exit, or relies on health checks for pruning. Introduces an additional stateful service that itself needs high-availability deployment.

Redis: Agent writes capabilities to a Hash or Set with TTL; querying side uses SCAN or SMEMBERS. Works, but no mechanism to subscribe to capability changes — polling only. SCAN degrades with many agents.

Kafka: Capability declarations go to a compacted topic; querying side consumes the entire topic to build a local view. High latency, complex implementation — a full Kafka consumer chain just for simple service discovery.

mq9: Each agent creates a public mailbox with a well-known name on startup (e.g., capability.worker001) and posts a capability declaration. TTL controls registration validity — no renewal means automatic expiry. The orchestrator needing discovery creates a public query mailbox (e.g., capability.query), broadcasts its query there, and capable agents reply directly to the orchestrator's mailbox.

bash
# Worker-001 registers capabilities (TTL=60, must renew periodically)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"capability.worker001","ttl":60}'
nats request '$mq9.AI.MAILBOX.MSG.capability.worker001' '{
  "capabilities":["data.analysis","report.generation"],
  "load":0.3,
  "reply_to":"capability.worker001"
}'

# Orchestrator creates query mailbox (one-time)
nats request '$mq9.AI.MAILBOX.CREATE' '{"public":true,"name":"capability.query"}'

# Workers subscribe to query mailbox and reply when queried
nats subscribe '$mq9.AI.MAILBOX.MSG.capability.query'

# Orchestrator broadcasts: who has data.analysis capability?
nats request '$mq9.AI.MAILBOX.MSG.capability.query' '{
  "from":"orchestrator",
  "need":"data.analysis",
  "reply_to":"orchestrator"
}'

# Orchestrator subscribes to its own mailbox to receive responses
nats subscribe '$mq9.AI.MAILBOX.MSG.orchestrator'

Advantage: High. Zero extra registry; TTL automatically detects agent exit; fully decentralized.


Summary

ScenarioHTTPRedisKafkamq9mq9 Advantage
Sub-agent completes task, notifies orchestrator asynchronouslyRequires other side online; manual retryNo persistence; hand-roll reliable queueWorks; high ops overheadCREATE + MSG + LIST safety netMedium
Orchestrator monitors all worker statusO(n) pollingSCAN pollingCompacted topic; lagPublic mailbox (status.worker001) + TTL auto-expiryHigh
Task fan-out, multiple workers competeBuild own schedulerBRPOP — unreliableRebalance causes pausePublic mailbox (task.queue) + queue group; zero configMedium
Agent detects anomaly, broadcasts alertMaintain subscriber listNo persistenceNeed multiple consumer groupsPublic alert mailbox (alert.payment) + multi-subscriberMedium
Cloud sends commands to offline edge agentImmediate failureNo priority; high memory costNo native priorityMSG three-level priority + persistence (edge.device)High
Agent sends approval request; human approves and flow continuesAt least three systemsBoth sides pollingApprover needs Kafka clientHuman and agent share one protocol, bidirectional MSG (approver.zhang)High
Agent A asks Agent B a question; B may be offlineImmediate failure if offlineTwo Lists + both sides pollingTwo topics + filter responsesMSG + reply_to; offline-safe (agent.a, agent.b)Medium
Agent registers capabilities; others discover and callNeeds registrySCAN pollingCompacted topic; complexPublic mailbox (capability.query) + TTL auto-cleanupHigh

Eight scenarios: 4 high-advantage, 4 medium-advantage. mq9's core value is not dominating on a single dimension — it is that no scenario requires a workaround. Four commands cover everything directly, with zero extra logic.

Worth noting: mq9 is still early-stage; its ecosystem maturity is nowhere near any of the solutions above. If your use case is big-data pipelines and high-throughput stream processing, Kafka remains the optimal choice. The comparisons above are specific to agent-to-agent async communication.


Mailbox Behavior Mapping

The complete behaviors of a real-world mailbox, mapped to mq9's four commands:

Real-world mailboxmq9 commandNotes
Open a mailboxMAILBOX.CREATEGet a mail_address; optional TTL, public flag, prefix
Send a letterMAILBOX.MSG.{mail_address}[.urgent|.critical]Recipient doesn't need to be home; supports urgent/normal
Receive lettersSUB $mq9.AI.MAILBOX.MSG.{mail_address}Online: real-time push in critical→urgent→normal order
Check the PO boxMAILBOX.LIST.{mail_address}Active pull; see what messages remain in the mailbox
Discard a letterMAILBOX.DELETE.{mail_address}.{msg_id}Explicit cleanup after processing
Mailbox expiresTTL auto-destroyUnused long enough — the post office cancels it automatically
Bulletin boardPublic mailbox + multi-subscriber SUBCreate a public mailbox; anyone who knows the name can read it
First come, first servedQueue groupAdd --queue on subscribe; each message goes to exactly one subscriber

Python Code Examples

Eight scenarios, eight functions, all using the native NATS Python SDK (nats-py) — zero custom wrappers.

python
import asyncio
import json
import nats

SERVER = "nats://localhost:4222"

# ============================================================
# Scenario 1: Sub-agent completes task, notifies orchestrator asynchronously
# ============================================================
async def scenario1_async_task_return():
    nc = await nats.connect(SERVER)

    # Orchestrator: create private mailbox
    reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":3600}', timeout=3)
    data = json.loads(reply.data)
    main_inbox = data["mail_address"]
    print(f"[Orchestrator] mailbox: {main_inbox}")

    received = asyncio.Event()

    async def on_message(msg):
        print(f"[Orchestrator] received result: {msg.data.decode()}")
        received.set()

    # Orchestrator subscribes to its own mailbox
    await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{main_inbox}", cb=on_message)

    # Sub-agent: task complete, send result
    await nc.request(
        f"$mq9.AI.MAILBOX.MSG.{main_inbox}",
        json.dumps({"task_id": "t-001", "result": "analysis complete, anomaly rate 2.3%"}).encode(),
        timeout=3
    )
    await asyncio.wait_for(received.wait(), timeout=5)
    await nc.close()


# ============================================================
# Scenario 2: Orchestrator monitors all worker status
# ============================================================
async def scenario2_global_status_awareness():
    nc = await nats.connect(SERVER)

    # Each worker creates a public status mailbox (TTL=60, renew periodically)
    for i in range(1, 3):
        await nc.request(
            "$mq9.AI.MAILBOX.CREATE",
            json.dumps({"public": True, "name": f"status.worker{i:03d}", "ttl": 60}).encode(),
            timeout=3
        )

    async def on_status(msg):
        print(f"[Orchestrator] status update: {msg.subject} -> {msg.data.decode()}")

    # Orchestrator subscribes to known workers' status mailboxes
    await nc.subscribe("$mq9.AI.MAILBOX.MSG.status.worker001", cb=on_status)
    await nc.subscribe("$mq9.AI.MAILBOX.MSG.status.worker002", cb=on_status)

    # Worker-001 reports status
    await nc.request(
        "$mq9.AI.MAILBOX.MSG.status.worker001",
        json.dumps({"status": "running", "load": 0.3, "capabilities": ["data.analysis"]}).encode(),
        timeout=3
    )
    # Worker-002 reports status
    await nc.request(
        "$mq9.AI.MAILBOX.MSG.status.worker002",
        json.dumps({"status": "idle", "load": 0.0, "capabilities": ["text.generation"]}).encode(),
        timeout=3
    )
    await asyncio.sleep(0.5)
    await nc.close()


# ============================================================
# Scenario 3: Task fan-out, multiple workers compete to consume
# ============================================================
async def scenario3_task_broadcast_compete():
    nc = await nats.connect(SERVER)

    # Create public task mailbox
    await nc.request(
        "$mq9.AI.MAILBOX.CREATE",
        json.dumps({"public": True, "name": "task.queue"}).encode(),
        timeout=3
    )

    for i in range(1, 4):
        worker_id = i
        async def on_task(msg, wid=worker_id):
            print(f"[Worker-{wid}] claimed task: {msg.data.decode()}")

        # Join queue group — compete within the same group
        await nc.subscribe("$mq9.AI.MAILBOX.MSG.task.queue", queue="task.workers", cb=on_task)

    await asyncio.sleep(0.2)

    # Orchestrator delivers tasks
    for i in range(1, 4):
        await nc.request(
            "$mq9.AI.MAILBOX.MSG.task.queue",
            json.dumps({"task_id": f"t-{i:03d}", "type": "data_analysis"}).encode(),
            timeout=3
        )

    await asyncio.sleep(0.5)
    await nc.close()


# ============================================================
# Scenario 4: Agent detects anomaly, broadcasts alert
# ============================================================
async def scenario4_anomaly_broadcast():
    nc = await nats.connect(SERVER)

    # Create public alert mailbox
    await nc.request(
        "$mq9.AI.MAILBOX.CREATE",
        json.dumps({"public": True, "name": "alert.payment"}).encode(),
        timeout=3
    )

    async def on_alert_a(msg):
        print(f"[Handler-A] {msg.data.decode()}")

    async def on_alert_b(msg):
        print(f"[Handler-B] {msg.data.decode()}")

    # Two handlers each subscribe independently — both receive every message
    await nc.subscribe("$mq9.AI.MAILBOX.MSG.alert.payment", cb=on_alert_a)
    await nc.subscribe("$mq9.AI.MAILBOX.MSG.alert.payment", cb=on_alert_b)

    await asyncio.sleep(0.2)

    # Monitoring agent publishes critical alert
    await nc.request(
        "$mq9.AI.MAILBOX.MSG.alert.payment.critical",
        json.dumps({"level": "critical", "detail": "payment timeout rate spiked to 15%"}).encode(),
        timeout=3
    )

    await asyncio.sleep(0.5)
    await nc.close()


# ============================================================
# Scenario 5: Cloud sends commands to offline edge agent
# ============================================================
async def scenario5_edge_offline_message():
    nc = await nats.connect(SERVER)

    # Edge agent registers mailbox on startup
    reply = await nc.request(
        "$mq9.AI.MAILBOX.CREATE",
        json.dumps({"ttl": 3600, "prefix": "edge"}).encode(),
        timeout=3
    )
    edge_inbox = json.loads(reply.data)["mail_address"]
    print(f"[Edge] mailbox: {edge_inbox}")

    # Cloud sends three priority levels (edge is simulated offline — messages are persisted)
    await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}.critical",
                     json.dumps({"command": "emergency_stop"}).encode(), timeout=3)
    await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}.urgent",
                     json.dumps({"command": "update_config", "interval": 30}).encode(), timeout=3)
    await nc.request(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}",
                     json.dumps({"command": "report_status"}).encode(), timeout=3)
    print("[Cloud] commands sent, waiting for edge to come online...")

    # Edge agent "comes online" — subscribes and receives in priority order
    async def on_command(msg):
        print(f"[Edge] received command: {msg.data.decode()}")

    await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{edge_inbox}", cb=on_command)

    # Safety net: LIST to verify nothing was missed
    list_reply = await nc.request(f"$mq9.AI.MAILBOX.LIST.{edge_inbox}", b"", timeout=3)
    print(f"[Edge] LIST safety net: {list_reply.data.decode()}")

    await asyncio.sleep(1)
    await nc.close()


# ============================================================
# Scenario 6: Human-in-the-loop approval workflow
# ============================================================
async def scenario6_human_approval():
    nc = await nats.connect(SERVER)

    # Approver creates public mailbox
    await nc.request(
        "$mq9.AI.MAILBOX.CREATE",
        json.dumps({"public": True, "name": "approver.zhang"}).encode(),
        timeout=3
    )

    # Agent creates private mailbox
    reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
    agent_inbox = json.loads(reply.data)["mail_address"]

    approved = asyncio.Event()

    async def on_approval_result(msg):
        result = json.loads(msg.data)
        print(f"[Agent] received approval result: approved={result['approved']}")
        approved.set()

    async def on_approval_request(msg):
        req = json.loads(msg.data)
        print(f"[Approver] received request: {req['content']}")
        # Approve and reply to agent
        await nc.request(
            f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
            json.dumps({"approved": True, "correlation_id": req["correlation_id"]}).encode(),
            timeout=3
        )

    await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.approver.zhang", cb=on_approval_request)
    await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{agent_inbox}", cb=on_approval_result)
    await asyncio.sleep(0.2)

    # Agent sends approval request
    await nc.request(
        "$mq9.AI.MAILBOX.MSG.approver.zhang.urgent",
        json.dumps({
            "type": "approval_request",
            "content": "requesting external API call, estimated cost $50",
            "reply_to": agent_inbox,
            "correlation_id": "approval-001"
        }).encode(),
        timeout=3
    )

    await asyncio.wait_for(approved.wait(), timeout=5)
    await nc.close()


# ============================================================
# Scenario 7: Agent A asks Agent B a question — B may be offline
# ============================================================
async def scenario7_async_request_reply():
    nc = await nats.connect(SERVER)

    # A and B each create their own mailbox
    reply_a = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
    inbox_a = json.loads(reply_a.data)["mail_address"]

    reply_b = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":3600}', timeout=3)
    inbox_b = json.loads(reply_b.data)["mail_address"]

    answered = asyncio.Event()

    async def on_question(msg):
        req = json.loads(msg.data)
        print(f"[Agent-B] received question: {req['question']}")
        await nc.request(
            f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
            json.dumps({"answer": "current progress is 72%", "correlation_id": req["correlation_id"]}).encode(),
            timeout=3
        )

    async def on_answer(msg):
        print(f"[Agent-A] received reply: {msg.data.decode()}")
        answered.set()

    await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{inbox_b}", cb=on_question)
    await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{inbox_a}", cb=on_answer)
    await asyncio.sleep(0.2)

    # A asks B a question
    await nc.request(
        f"$mq9.AI.MAILBOX.MSG.{inbox_b}",
        json.dumps({
            "question": "what is the current progress?",
            "reply_to": inbox_a,
            "correlation_id": "req-001"
        }).encode(),
        timeout=3
    )

    await asyncio.wait_for(answered.wait(), timeout=5)
    await nc.close()


# ============================================================
# Scenario 8: Agent registers capabilities; others discover and call it
# ============================================================
async def scenario8_capability_discovery():
    nc = await nats.connect(SERVER)

    # Create public query mailbox
    await nc.request(
        "$mq9.AI.MAILBOX.CREATE",
        json.dumps({"public": True, "name": "capability.query"}).encode(),
        timeout=3
    )

    # Orchestrator creates private mailbox
    reply = await nc.request("$mq9.AI.MAILBOX.CREATE", b'{"ttl":300}', timeout=3)
    orchestrator_inbox = json.loads(reply.data)["mail_address"]

    # Worker-001 registers capabilities (public mailbox + capability declaration)
    await nc.request(
        "$mq9.AI.MAILBOX.CREATE",
        json.dumps({"public": True, "name": "capability.worker001", "ttl": 60}).encode(),
        timeout=3
    )
    await nc.request(
        "$mq9.AI.MAILBOX.MSG.capability.worker001",
        json.dumps({"capabilities": ["data.analysis", "report.generation"], "load": 0.3}).encode(),
        timeout=3
    )

    async def on_query(msg):
        req = json.loads(msg.data)
        if "data.analysis" in req.get("need", ""):
            print("[Worker-001] responding to capability query")
            await nc.request(
                f"$mq9.AI.MAILBOX.MSG.{req['reply_to']}",
                json.dumps({
                    "from": "capability.worker001",
                    "capabilities": ["data.analysis", "report.generation"],
                    "load": 0.3
                }).encode(),
                timeout=3
            )

    async def on_response(msg):
        print(f"[Orchestrator] found capable agent: {msg.data.decode()}")

    await nc.subscribe("$mq9.AI.MAILBOX.MSG.capability.query", cb=on_query)
    await nc.subscribe(f"$mq9.AI.MAILBOX.MSG.{orchestrator_inbox}", cb=on_response)
    await asyncio.sleep(0.2)

    # Orchestrator broadcasts query
    print("[Orchestrator] querying: who has data.analysis capability?")
    await nc.request(
        "$mq9.AI.MAILBOX.MSG.capability.query",
        json.dumps({"need": "data.analysis", "reply_to": orchestrator_inbox}).encode(),
        timeout=3
    )

    await asyncio.sleep(0.5)
    await nc.close()


# ============================================================
# main
# ============================================================
async def main():
    print("=== Scenario 1: Async Task Result Return ===")
    await scenario1_async_task_return()

    print("\n=== Scenario 2: Global Status Awareness ===")
    await scenario2_global_status_awareness()

    print("\n=== Scenario 3: Task Fan-out and Competing Consumers ===")
    await scenario3_task_broadcast_compete()

    print("\n=== Scenario 4: Anomaly Alert Broadcast ===")
    await scenario4_anomaly_broadcast()

    print("\n=== Scenario 5: Edge Offline Message Backlog ===")
    await scenario5_edge_offline_message()

    print("\n=== Scenario 6: Human-in-the-Loop Workflow ===")
    await scenario6_human_approval()

    print("\n=== Scenario 7: Async Request-Reply ===")
    await scenario7_async_request_reply()

    print("\n=== Scenario 8: Capability Registration and Discovery ===")
    await scenario8_capability_discovery()

asyncio.run(main())
🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀