Async Communication in LangChain Agents with mq9
Where the Problem Starts
Say you're building a multi-agent system with LangChain. One agent fetches data, another analyzes it, and a third generates reports.
The most straightforward approach is chained calls: Agent A calls Agent B, B calls C, all synchronous. Simple code, clear logic.
But you'll quickly run into problems:
- Agent A takes 30 seconds to fetch data. Agent B is blocked, doing nothing.
- Agent B crashes. The whole pipeline breaks and Agent A's result is lost.
- Agent C has limited throughput. Traffic spikes cause a backlog with no buffer upstream.
- You want Agent A to fan out to three different agents simultaneously. Synchronous calls can't do that.
This isn't a code style problem — it's an architectural one. Synchronous calls couple agents together. Any delay or failure at one node propagates through the entire chain.
What mq9 Is
mq9 is a messaging protocol designed specifically for async agent communication. Its core concept is the Mailbox.
Each agent has a mailbox. Other agents drop messages into it; the agent picks them up whenever it's ready. Messages are persisted and automatically cleaned up when their TTL expires. Sender and receiver don't need to be online at the same time.
Key features:
- Mailbox is an agent's address — private (UUID) or public (custom name)
- Priority supports three levels: critical / urgent / normal; critical messages are consumed first
- Store-first delivery: on subscribe, all unexpired historical messages are replayed before live messages
- Queue group: multiple agents share a group, each message is processed exactly once
mq9 is built on the NATS protocol and provides a native LangChain Toolkit integration — just a few lines of code to plug in.
Hands-on: Up and Running in Three Minutes
Install
pip install langchain langchain-openai langchain-mq9Minimal Example
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI
toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()
agent = initialize_agent(
tools,
ChatOpenAI(model="gpt-4o-mini"),
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
agent.invoke({"input": "Create a Mailbox to receive task results."})The Toolkit includes six tools covering the full mq9 protocol:
| Tool | Description |
|---|---|
CreateMailboxTool | Create a private Mailbox, returns a UUID |
CreatePublicMailboxTool | Create a public Mailbox with a custom name as its address |
SendMessageTool | Send a message to any Mailbox, with priority support |
GetMessagesTool | Read messages from a Mailbox (full content) |
ListMessagesTool | List message metadata in a Mailbox (msg_id, priority, ts) — no message body |
DeleteMessageTool | Delete a specific message (optional operation) |
Full Scenario: DataAgent and AnalysisAgent
Here's a complete example of two agents communicating through mq9 in a decoupled way:
- DataAgent: fetches data, sends it to AnalysisAgent, and doesn't wait for results
- AnalysisAgent: watches the Mailbox and processes messages as they arrive
DataAgent
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI
toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()
data_agent = initialize_agent(
tools,
ChatOpenAI(model="gpt-4o-mini"),
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
# Create a Mailbox, share the mail_id with other agents, send data, return immediately
data_agent.invoke({
"input": """
1. Create a Mailbox (private or public), TTL 3600 seconds, record the returned mail_id
2. Send a normal-priority message to this Mailbox: 'dataset_v1: [record_1, record_2, record_3]'
3. Return immediately after sending — no need to wait for results
"""
})AnalysisAgent
from langchain_mq9 import Mq9Toolkit
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI
toolkit = Mq9Toolkit(server="nats://demo.robustmq.com:4222")
tools = toolkit.get_tools()
analysis_agent = initialize_agent(
tools,
ChatOpenAI(model="gpt-4o-mini"),
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
# Two options: GetMessagesTool to fetch all messages at once, or subscribe for incremental delivery
analysis_agent.invoke({
"input": """
Read messages from the Mailbox and complete the analysis.
Either fetch all existing messages at once (GetMessagesTool),
or subscribe for continuous incremental delivery.
"""
})DataAgent returns immediately after sending and moves on to its next task. AnalysisAgent processes messages at its own pace. The two are fully decoupled.
Priority: Urgent Tasks First
mq9 has three priority levels built in — no extra configuration needed. Within the same Mailbox, critical messages are always consumed before urgent and normal ones.
# Send an urgent task
agent.invoke({
"input": "Send a critical-priority message to 'analysis-inbox': 'URGENT: system anomaly data requires immediate analysis'"
})
# Send a background task
agent.invoke({
"input": "Send an urgent-priority message to 'analysis-inbox': 'BACKGROUND: historical data archival analysis'"
})Regardless of send order, AnalysisAgent always processes messages in critical → urgent → normal order.
Fan-out: Multiple Agents Subscribing to One Mailbox
Create a Mailbox and share the mail_id with multiple agents. Any number of agents can subscribe to it. The sender just drops messages into the Mailbox without knowing who's listening.
# DataAgent: create Mailbox and send message
data_agent.invoke({
"input": """
Create a Mailbox, TTL 3600 seconds, record the returned mail_id.
Send this message to the Mailbox: 'analysis_result_v1: insights=[A, B, C]'
"""
})
# ReportAgent: subscribe to the same Mailbox
report_agent.invoke({
"input": "Subscribe to the Mailbox with mail_id {mail_id}. Generate a report when a message arrives."
})
# MonitorAgent: also subscribe to this Mailbox
monitor_agent.invoke({
"input": "Subscribe to the Mailbox with mail_id {mail_id}. Update the monitoring dashboard when a message arrives."
})The sender sends once. Every agent subscribed to that Mailbox receives the message. New subscribers can be added at any time — the sender needs no changes.
Direct Calls vs. mq9 Async Communication
| Direct Call | mq9 Async | |
|---|---|---|
| Coupling | Sender must know the receiver's interface | Sender only needs to know the Mailbox name |
| Availability | Receiver crash = sender failure | Receiver crash = messages persist and wait |
| Concurrency | Synchronous, blocking | Fire and forget, non-blocking |
| Scalability | One-to-one | One-to-many, add receivers anytime |
| Message history | Not supported | New subscribers automatically receive unexpired messages |
Complete Runnable Example
Two agents communicating asynchronously via mq9: DataAgent creates a Mailbox and sends messages; AnalysisAgent reads and processes them. No local deployment needed — connect directly to the demo server.
import asyncio
from langchain_mq9 import Mq9Toolkit, CreateMailboxTool, SendMessageTool, GetMessagesTool
from langchain.agents import AgentType, initialize_agent
from langchain_openai import ChatOpenAI
SERVER = "nats://demo.robustmq.com:4222"
async def main():
llm = ChatOpenAI(model="gpt-4o-mini")
toolkit = Mq9Toolkit(server=SERVER)
tools = toolkit.get_tools()
# 1. Create a Mailbox and get the mail_id
mail_id = await CreateMailboxTool(server=SERVER)._arun(ttl=3600)
print(f"Mailbox created: {mail_id}")
# 2. DataAgent: send messages and return immediately
data_agent = initialize_agent(
tools, llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
await data_agent.ainvoke({"input": f"Send two messages to the Mailbox with mail_id {mail_id}: first with priority critical, content 'URGENT: anomaly detected in dataset_v1'; second with priority normal, content 'dataset_v2: [record_1, record_2, record_3]'"})
print("DataAgent: messages sent, moving on to the next task...")
# 3. AnalysisAgent: read all messages and process them
# critical messages are returned first regardless of send order
analysis_agent = initialize_agent(
tools, llm,
agent=AgentType.OPENAI_FUNCTIONS,
verbose=True,
)
await analysis_agent.ainvoke({"input": f"Read all messages from the Mailbox with mail_id {mail_id}, output each message in priority order, and complete the analysis."})
asyncio.run(main())Expected output:
Mailbox created: m-550e8400-e29b-41d4-a716-446655440000
DataAgent: messages sent, moving on to the next task...
> Entering new AgentExecutor chain...
> Using tool: GetMessagesTool
Found 2 message(s):
[1] priority=critical content=URGENT: anomaly detected in dataset_v1
[2] priority=normal content=dataset_v2: [record_1, record_2, record_3]Critical messages are always returned first, regardless of send order.
When to Use mq9
Good fit:
- Clear producer/consumer relationship between agents
- Long processing tasks where you don't want upstream to wait
- Need buffering because the receiver has limited throughput
- One result needs to be sent to multiple agents
- Agents may go offline and messages need to be persisted
Probably don't need mq9:
- You need an immediate return value (use a direct call)
- Simple two-step flow with no concurrency requirements
Resources
- mq9 protocol spec: docs/mq9-protocol.md
- LangChain integration: langchain-mq9
- RobustMQ: github.com/robustmq/robustmq
- Demo server:
nats://demo.robustmq.com:4222(connect directly for testing, no local deployment needed)
