Skip to content

Connecting Two LangGraph Agents with mq9

The Problem

LangGraph handles single-Agent logic well — tool calls, state management, and conditional branching are all cleanly expressed inside a graph.

But what about collaboration between multiple Agents?

The common approach is to put them in the same process and use a Supervisor pattern for orchestration. That works for simple cases, but it has a fundamental limitation: all Agents must live in the same process — cross-service deployment is not possible.

In production, a ResearchAgent and a ReportAgent are often independent services, potentially running on different machines with their own resource configurations and scaling policies. The Supervisor pattern breaks down in that world.

mq9 solves exactly this — it lets two independent LangGraph Agents communicate asynchronously through Mailboxes, completely decoupled.


Architecture

img

LangGraph owns the internal logic of each Agent: tool calls, reasoning, and state transitions.

mq9 owns the message transport between Agents: cross-process, asynchronous, and durable when offline.


Installation

bash
pip install langgraph langchain-openai langchain-mq9

ReportAgent

ReportAgent is the service provider. On startup it registers to $SYSTEM.PUBLIC, continuously listens to its task Mailbox, processes incoming tasks through a LangGraph graph, and sends results back to the caller.

python
# report_agent.py
import asyncio
import json
from typing import TypedDict, Annotated
import operator

from langgraph.graph import StateGraph, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from robustmq.mq9 import Client, Priority

SERVER = "nats://demo.robustmq.com:4222"
llm = ChatOpenAI(model="gpt-4o-mini")

# ── Tools ─────────────────────────────────────────────────
@tool
def format_report(title: str, content: str) -> str:
    """Format and generate a report"""
    return f"# {title}\n\n## Summary\n{content}\n\n## Conclusion\nAnalysis complete."

# ── LangGraph graph ───────────────────────────────────────
class ReportState(TypedDict):
    messages: Annotated[list, operator.add]
    query: str
    data: str
    report: str
    reply_to: str

inner_agent = create_react_agent(llm, [format_report])

def generate_report(state: ReportState):
    result = inner_agent.invoke({
        "messages": [HumanMessage(
            content=f"Generate a report about '{state['query']}' based on: {state['data']}"
        )]
    })
    return {"report": result["messages"][-1].content}

async def send_result(state: ReportState):
    async with Client(SERVER) as client:
        await client.send(
            state["reply_to"],
            json.dumps({"report": state["report"]}).encode(),
            priority=Priority.NORMAL
        )
    print("ReportAgent: report sent back")
    return {}

graph = StateGraph(ReportState)
graph.add_node("generate", generate_report)
graph.add_node("send", send_result)
graph.set_entry_point("generate")
graph.add_edge("generate", "send")
graph.add_edge("send", END)
report_app = graph.compile()

# ── Main ──────────────────────────────────────────────────
async def main():
    async with Client(SERVER) as client:
        # Create a public Mailbox with a fixed name; ResearchAgent sends to this address directly
        await client.create(
            ttl=86400,
            public=True,
            name="report-agent-inbox"
        )
        print("ReportAgent: Mailbox created, waiting for tasks...")

        async def on_task(msg):
            task = json.loads(msg.data)
            print(f"ReportAgent: received task — {task['query']}")
            await report_app.ainvoke({
                "messages": [],
                "query": task["query"],
                "data": task["data"],
                "report": "",
                "reply_to": task["reply_to"]
            })

        await client.subscribe("report-agent-inbox", on_task)
        await asyncio.sleep(float("inf"))  # run forever

asyncio.run(main())

ResearchAgent

ResearchAgent is the task initiator. It creates a private result Mailbox, runs its LangGraph search logic, sends the findings to ReportAgent, and then waits asynchronously for the report.

python
# research_agent.py
import asyncio
import json
from typing import TypedDict, Annotated
import operator

from langgraph.graph import StateGraph, END
from langgraph.prebuilt import create_react_agent
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_core.tools import tool
from robustmq.mq9 import Client, Priority

SERVER = "nats://demo.robustmq.com:4222"
llm = ChatOpenAI(model="gpt-4o-mini")

# ── Tools ─────────────────────────────────────────────────
@tool
def search_data(query: str) -> str:
    """Search for relevant data"""
    return f"Data about '{query}': [item1, item2, item3]"

# ── LangGraph graph ───────────────────────────────────────
class ResearchState(TypedDict):
    messages: Annotated[list, operator.add]
    query: str
    research_result: str
    reply_mailbox: str

inner_agent = create_react_agent(llm, [search_data])

def do_research(state: ResearchState):
    result = inner_agent.invoke({
        "messages": [HumanMessage(content=f"Search and summarize: {state['query']}")]
    })
    return {"research_result": result["messages"][-1].content}

async def send_to_report(state: ResearchState):
    async with Client(SERVER) as client:
        task = {
            "query": state["query"],
            "data": state["research_result"],
            "reply_to": state["reply_mailbox"]
        }
        await client.send(
            "report-agent-inbox",
            json.dumps(task).encode(),
            priority=Priority.NORMAL
        )
    print("ResearchAgent: sent to ReportAgent, continuing with other tasks...")
    return {}

graph = StateGraph(ResearchState)
graph.add_node("research", do_research)
graph.add_node("send", send_to_report)
graph.set_entry_point("research")
graph.add_edge("research", "send")
graph.add_edge("send", END)
research_app = graph.compile()

# ── Main ──────────────────────────────────────────────────
async def main():
    async with Client(SERVER) as client:
        # Create a private result Mailbox to receive ReportAgent's response
        reply_box = await client.create(ttl=300)
        print(f"ResearchAgent result Mailbox: {reply_box.mail_address}")

        # Run the LangGraph workflow
        await research_app.ainvoke({
            "messages": [],
            "query": "core features of the mq9 protocol",
            "research_result": "",
            "reply_mailbox": reply_box.mail_address
        })

        # Wait asynchronously for the report with a timeout
        print("ResearchAgent: waiting for report...")
        result_event = asyncio.Event()

        async def on_result(msg):
            result = json.loads(msg.data)
            print(f"\n===== Report received =====\n{result['report']}")
            result_event.set()

        sub = await client.subscribe(reply_box.mail_address, on_result)
        await asyncio.wait_for(result_event.wait(), timeout=30.0)
        await sub.unsubscribe()

asyncio.run(main())

Running

bash
# Terminal 1: start ReportAgent first
python report_agent.py

# Terminal 2: start ResearchAgent
python research_agent.py

ReportAgent terminal:

ReportAgent: Mailbox created, waiting for tasks...
ReportAgent: received task — core features of the mq9 protocol
ReportAgent: report sent back

ResearchAgent terminal:

ResearchAgent result Mailbox: mail@xxxx.xxxx
ResearchAgent: sent to ReportAgent, continuing with other tasks...
ResearchAgent: waiting for report...

===== Report received =====
# Core Features of the mq9 Protocol
...

Key Design Decisions

ReportAgent creates a public Mailbox: create(public=True, name="report-agent-inbox") creates a Mailbox with a fixed, well-known address. ResearchAgent knows this name in advance and sends directly to it.

ResearchAgent only creates a private Mailbox: It does not need to be discoverable — it only needs an address to receive the result. TTL is set to 300 seconds; it cleans itself up automatically.

reply_to pattern: The task payload carries a reply_to field telling ReportAgent where to send the result. This is the standard pattern for asynchronous request-reply.

The two Agents are fully decoupled: When ReportAgent is offline, tasks sent by ResearchAgent are persisted in the Mailbox. ReportAgent picks them up automatically when it comes back online.


Further Reading

If your Agents need to communicate using the A2A standard protocol, see Using mq9 for Reliable Async A2A Communication — the principle is the same; the message format is simply replaced with a standard A2A Task object.


Resources

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀