Skip to content

mq9 vs 现有方案:八个场景逐一对比

Agent 之间的通信,今天没有标准答案。大家都在用手边的工具绕——HTTP、Redis、Kafka、自研队列。能用,但每个场景都要加东西。

这篇文章把 mq9 要解决的八个核心场景拿出来,逐一对比现有方案怎么做、痛点在哪、mq9 怎么做、优势有多大。不吹不黑,摆事实。


场景一:子 Agent 完成任务,异步通知主 Agent

HTTP:子 Agent 回调主 Agent 的 webhook。主 Agent 必须在线且暴露端点。重启、网络抖动,回调丢了,需要自己做重试、幂等、超时处理。每个项目写一遍。

Redis:子 Agent publish 到 channel,主 Agent subscribe。主 Agent 不在线那一刻消息直接丢了。要做可靠投递,得自己加 Redis Stream 或 List,本质上在 Redis 上手搓消息队列。

Kafka:创建 topic,子 Agent produce,主 Agent consume。能用,但需要预先创建 topic、配置 partition、配置 retention、管理 consumer group 和 offset。一个简单的任务回传,引入了一整套 Kafka 运维。

mq9:主 Agent CREATE 邮箱拿到 mail_id,子 Agent PUB 到 $mq9.AI.INBOX.{mail_id}.normal。在线实时收到,不在线消息存着等来取。怀疑有遗漏,QUERY 兜底拉取。


场景二:主 Agent 感知所有子 Agent 状态

HTTP 轮询:主 Agent 定时请求每个子 Agent 的 /health 端点。需要维护 Agent 列表,新增要注册,消亡要清理。n 个 Agent 就是 n 个请求,O(n) 开销。间隔太长感知延迟大,太短浪费资源。

Redis:每个 Agent 写一个 key 带 TTL,主 Agent 用 SCAN 遍历。O(n) 操作,Agent 多了性能差。Redis 没有"订阅 key 变化"的可靠原生能力,只能轮询。

Kafka:每个 Agent 往 compacted topic 写状态。能实现,但清理不是实时的,有延迟。每次写入都是一条持久化日志,只为"最新状态"付出了全量持久化的代价。

mq9:Worker 申请 type=latest 的状态邮箱,定期 PUB 状态到自己的 INBOX,只保留最新一条。主 Agent 订阅感知全局。新 Agent 加入自动感知,超过 TTL 自动过期感知消亡。零维护。


场景三:任务广播,多个 Worker 竞争消费

HTTP:主 Agent 自己实现负载均衡,维护 Worker 列表,选一个发请求。Worker 挂了要重试换一个。自己写调度器,自己处理故障转移。

Redis:主 Agent LPUSH 到 List,Worker 用 BRPOP 竞争获取。最常见的方案,但没有消息确认——Worker BRPOP 拿走后崩溃,任务丢了,需要自己做可靠队列。没有优先级,没有广播能力。

Kafka:发到 topic,多个 consumer 在同一个 group 里竞争。能用,但 partition 数决定最大并行度,动态伸缩 Worker 需要 rebalance,rebalance 期间有消费停顿。

mq9PUB $mq9.AI.BROADCAST.task.available,Worker 用 queue group 竞争消费,只有一个抢到。Worker 崩溃消息自动重投。动态加减 Worker 零配置,无 rebalance。结果发回主 Agent 的 INBOX。


场景四:Agent 发现异常,广播告警

HTTP:发现异常的 Agent 需要知道谁有处理能力,维护订阅者列表,逐一 POST。列表维护本身就是分布式问题。

Redis:PUBLISH 到 channel,所有订阅者收到。能用,但不在线的收不到,没有持久化。告警发出那一刻处理方刚好重启,告警就丢了。

Kafka:发到 topic,每个处理方用独立 consumer group 消费。能实现一发多收,但每个处理方都要配 consumer group,topic 要预先创建。

mq9PUB $mq9.AI.BROADCAST.{domain}.anomaly。所有订阅 $mq9.AI.BROADCAST.*.anomaly 的 Agent 收到。发布方不需要知道谁在听。重要告警加 persist=true,不在线的上线后也能收到。


场景五:云端给离线边缘 Agent 发指令

HTTP:直接失败。边缘不在线,请求超时。需要自建重试队列、指数退避、持久化未发送指令。本质上在云端手搓消息队列。

Redis:写到 List 等边缘上线消费。但没有优先级,紧急指令和普通指令混在一起排队。要做优先级得用 Sorted Set,复杂度上去了。Redis 是内存数据库,积压量大成本高。

Kafka:写到 topic,边缘上线后消费。能用,但没有原生优先级——partition 内严格 FIFO,紧急指令要走单独 topic,消费端要同时订阅多个 topic 自己做优先级调度。

mq9INBOX.{edge_mail_id}.urgentINBOX.{edge_mail_id}.normal 分优先级发送。边缘上线后 urgent 先处理,normal 后处理。QUERY 兜底拉取确保不遗漏。RocksDB 持久化,成本远低于 Redis 内存存储。


场景六:人机混合工作流

HTTP + 前端:Agent 调审批服务 API 创建审批单,前端轮询或 WebSocket 推送给审批员,审批员操作后回调 Agent。至少三个系统对接:Agent → 审批服务 → 前端 → 审批服务 → Agent。

Redis:Agent 写审批请求到 key,审批员客户端轮询,审批后写结果到另一个 key,Agent 轮询结果。两端都在轮询,延迟高,逻辑分散。

Kafka:审批请求发一个 topic,审批结果发另一个 topic。能用,但审批员需要 Kafka 消费客户端,对非技术人员不友好。

mq9:Agent PUB 到 INBOX.{审批员mail_id}.urgent,审批员订阅自己 INBOX 收到请求,审批后 PUB 到 Agent 的 INBOX。人类和 Agent 用完全相同的协议,不需要额外审批服务。correlation_id 保证配对。


场景七:Agent A 向 Agent B 提问,B 可能不在线

HTTP:B 不在线直接失败。要做异步,得引入回调 URL、correlation_id、超时重试、状态机——复杂度爆炸。

Redis:A 写请求到 B 的 List,B 处理后写结果到 A 的 List。两个 List,两端轮询,自己管 correlation_id 配对。能用,代码量不小。

Kafka:A 发请求到 request topic 带 reply_to header,B 消费处理后发到 reply topic。两个 topic、两个 consumer group,A 还要从 reply topic 按 correlation_id 过滤属于自己的响应。

mq9:A PUB 到 INBOX.{B的mail_id}.normal,payload 带 reply_to 和 correlation_id。B 上线处理后 PUB 到 INBOX.{A的mail_id}.normal。NATS 原生 reply-to 支持,B 不在线请求在邮箱等着,不丢。


场景八:Agent 注册能力,其他 Agent 发现它

HTTP + 注册中心:需要中心化的服务注册中心(Consul、Eureka、自建)。Agent 启动注册,消亡注销或靠健康检查剔除。引入额外有状态服务,本身需要高可用部署。

Redis:Agent 把能力写到 Hash 或 Set 带 TTL,查询方 SCAN 或 SMEMBERS。能用,但没有订阅能力变化的机制,只能轮询。Agent 多了 SCAN 性能差。

Kafka:能力声明写到 compacted topic,查询方消费整个 topic 构建本地视图。延迟高,实现复杂,为简单的服务发现引入了完整的 Kafka 消费链路。

mq9:Agent 申请 type=latest 的邮箱,能力声明 PUB 到自己的 INBOX。其他 Agent 订阅感知,或通过 BROADCAST.capability.query 主动查询,有能力的 Agent 响应到发现方 INBOX。去中心化,不需要注册中心,Agent 消亡 TTL 自动清理。


总结

场景HTTPRedisKafkamq9mq9 优势
子 Agent 完成任务,异步通知主 Agent需对方在线,自建重试不持久化,需手搓可靠队列能用,运维成本高CREATE + INBOX + QUERY 兜底
主 Agent 感知所有子 Agent 状态轮询 O(n)SCAN 轮询compacted topic,延迟高latest 邮箱,实时订阅,TTL 自动过期
任务广播,多个 Worker 竞争消费自建调度BRPOP 不可靠rebalance 有停顿BROADCAST + queue group,零配置
Agent 发现异常,广播告警维护订阅列表不持久化需多 consumer groupBROADCAST + 可选持久化
云端给离线边缘 Agent 发指令直接失败无优先级,内存成本高无原生优先级INBOX 三级优先级 + QUERY 兜底
Agent 发审批请求,人类审批后继续流程至少三系统对接两端轮询审批员需 Kafka 客户端人机同一协议,INBOX 双向通信
Agent A 向 Agent B 提问,B 可能不在线不在线直接失败两 List + 两端轮询两 topic + 过滤响应INBOX + reply-to 原生,离线不丢
Agent 注册能力,其他 Agent 发现并调用需注册中心SCAN 轮询compacted topic,实现复杂latest 邮箱 + BROADCAST 查询,零额外服务

八个场景,4 个高优势、3 个中优势、1 个低优势。mq9 的核心价值不是某个维度的碾压,而是每个场景都不需要绕路——四个命令字直接覆盖,零额外逻辑。

需要说明的是:mq9 目前处于早期阶段,生态成熟度远不及以上任何一个方案。如果你的场景是大数据管道和高吞吐流处理,Kafka 仍然是最优解。以上对比针对的是 Agent 异步通信这个特定场景。


邮箱行为映射

现实邮箱的完整行为,对应 mq9 四个命令字:

现实邮箱mq9说明
开通邮箱MAILBOX.CREATE拿到 mail_id,可选 standard(累积)或 latest(只留最新)
寄信INBOX.{mail_id}.{priority}对方在不在家不管,分急件/普通件/广告
收信SUB $mq9.AI.INBOX.{mail_id}.*在线时服务端推送,急件先看
去邮局查信MAILBOX.QUERY.{mail_id}兜底拉取,怕有遗漏
回信NATS 原生 reply-to信里有对方回信地址
邮箱过期TTL 自动销毁长期没人用,邮局自动注销
小区广播BROADCAST.{domain}.{event}公告栏,贴上去所有人能看
只看某类公告通配符订阅$mq9.AI.BROADCAST.*.anomaly
抢活queue group公告栏贴了一个活,只能一个人接

Java 代码示例

八个场景,八个函数,全部基于 NATS 原生 Java SDK,零自定义封装。

java
// 依赖: io.nats:jnats:2.20.5

import io.nats.client.*;
import java.time.Duration;

public class Mq9Scenarios {

    // ============================================================
    // 场景一:子 Agent 完成任务,异步通知主 Agent
    // ============================================================
    static void scenario1_AsyncTaskReturn(Connection nc) throws Exception {
        // 主 Agent:订阅自己的邮箱
        Dispatcher mainAgent = nc.createDispatcher((msg) -> {
            System.out.println("[主Agent] 收到任务结果: " + new String(msg.getData()));
        });
        mainAgent.subscribe("$mq9.AI.INBOX.main-agent.*");

        // 子 Agent:任务完成,发结果到主 Agent 邮箱
        nc.publish("$mq9.AI.INBOX.main-agent.normal",
                "{\"from\":\"worker-001\",\"task_id\":\"t-001\",\"result\":\"分析完成,异常率2.3%\"}".getBytes());
    }

    // ============================================================
    // 场景二:主 Agent 感知所有子 Agent 状态
    // ============================================================
    static void scenario2_GlobalStatusAwareness(Connection nc) throws Exception {
        // 主 Agent:订阅所有 Worker 的状态邮箱
        Dispatcher mainAgent = nc.createDispatcher((msg) -> {
            System.out.println("[状态感知] " + msg.getSubject() + " -> " + new String(msg.getData()));
        });
        // 订阅所有 Worker 状态邮箱(约定前缀 status-)
        mainAgent.subscribe("$mq9.AI.INBOX.status-*.*");

        // Worker 们各自上报状态到自己的 latest 邮箱
        nc.publish("$mq9.AI.INBOX.status-worker-001.normal",
                "{\"status\":\"running\",\"load\":0.3,\"capabilities\":[\"data.analysis\"]}".getBytes());
        nc.publish("$mq9.AI.INBOX.status-worker-002.normal",
                "{\"status\":\"idle\",\"load\":0.0,\"capabilities\":[\"text.generation\"]}".getBytes());
    }

    // ============================================================
    // 场景三:任务广播,多个 Worker 竞争消费
    // ============================================================
    static void scenario3_TaskBroadcastCompete(Connection nc) throws Exception {
        // 三个 Worker 加入同一个 queue group,竞争消费
        for (int i = 1; i <= 3; i++) {
            final int workerId = i;
            Dispatcher worker = nc.createDispatcher((msg) -> {
                System.out.println("[Worker-" + workerId + "] 抢到任务: " + new String(msg.getData()));
            });
            worker.subscribe("$mq9.AI.BROADCAST.task.available", "task.workers");
        }

        Thread.sleep(200);

        // 主 Agent 广播三个任务
        for (int i = 1; i <= 3; i++) {
            nc.publish("$mq9.AI.BROADCAST.task.available",
                    ("{\"task_id\":\"t-00" + i + "\",\"type\":\"data_analysis\"}").getBytes());
        }
    }

    // ============================================================
    // 场景四:Agent 发现异常,广播告警
    // ============================================================
    static void scenario4_AnomalyBroadcast(Connection nc) throws Exception {
        // 告警处理 Agent:订阅所有域的异常事件
        Dispatcher alertHandler1 = nc.createDispatcher((msg) -> {
            System.out.println("[告警处理A] 收到告警: " + new String(msg.getData()));
        });
        alertHandler1.subscribe("$mq9.AI.BROADCAST.*.anomaly");

        // 另一个只关心 payment 域
        Dispatcher alertHandler2 = nc.createDispatcher((msg) -> {
            System.out.println("[告警处理B-支付专用] 收到告警: " + new String(msg.getData()));
        });
        alertHandler2.subscribe("$mq9.AI.BROADCAST.payment.anomaly");

        Thread.sleep(200);

        // 某个 Agent 发现异常,广播出去
        nc.publish("$mq9.AI.BROADCAST.payment.anomaly",
                "{\"from\":\"monitor-agent\",\"level\":\"critical\",\"detail\":\"支付超时率突增到15%\"}".getBytes());
    }

    // ============================================================
    // 场景五:云端给离线边缘 Agent 发指令
    // ============================================================
    static void scenario5_EdgeOfflineMessage(Connection nc) throws Exception {
        // 云端发送指令到边缘 Agent 邮箱(边缘可能不在线)
        nc.publish("$mq9.AI.INBOX.edge-device-001.urgent",
                "{\"from\":\"cloud\",\"command\":\"emergency_stop\",\"reason\":\"温度过高\"}".getBytes());
        nc.publish("$mq9.AI.INBOX.edge-device-001.normal",
                "{\"from\":\"cloud\",\"command\":\"update_config\",\"config\":{\"interval\":30}}".getBytes());
        nc.publish("$mq9.AI.INBOX.edge-device-001.notify",
                "{\"from\":\"cloud\",\"info\":\"固件v2.1已发布\"}".getBytes());

        System.out.println("[云端] 指令已发送,等待边缘设备上线...");

        // 边缘 Agent 上线后,订阅自己邮箱,按优先级收取
        Thread.sleep(1000);
        Dispatcher edgeAgent = nc.createDispatcher((msg) -> {
            String priority = msg.getSubject().substring(msg.getSubject().lastIndexOf('.') + 1);
            System.out.println("[边缘设备] 收到[" + priority + "]: " + new String(msg.getData()));
        });
        edgeAgent.subscribe("$mq9.AI.INBOX.edge-device-001.*");

        // 兜底:QUERY 拉取可能遗漏的消息
        Message queryReply = nc.request("$mq9.AI.MAILBOX.QUERY.edge-device-001",
                "{\"token\":\"tok-edge-001\"}".getBytes(), Duration.ofSeconds(3));
        if (queryReply != null) {
            System.out.println("[边缘设备] QUERY 兜底: " + new String(queryReply.getData()));
        }
    }

    // ============================================================
    // 场景六:人机混合工作流
    // ============================================================
    static void scenario6_HumanApproval(Connection nc) throws Exception {
        // 人类审批员:订阅自己的邮箱
        Dispatcher approver = nc.createDispatcher((msg) -> {
            System.out.println("[审批员] 收到审批请求: " + new String(msg.getData()));
            // 审批通过,结果发回 Agent 邮箱
            nc.publish("$mq9.AI.INBOX.agent-001.normal",
                    "{\"from\":\"approver-zhang\",\"approved\":true,\"comment\":\"同意,注意风控\",\"correlation_id\":\"approval-001\"}".getBytes());
            System.out.println("[审批员] 已审批通过");
        });
        approver.subscribe("$mq9.AI.INBOX.approver-zhang.*");

        // Agent:订阅自己的邮箱等待审批结果
        Dispatcher agent = nc.createDispatcher((msg) -> {
            System.out.println("[Agent] 收到审批结果: " + new String(msg.getData()));
            System.out.println("[Agent] 审批通过,继续执行流程...");
        });
        agent.subscribe("$mq9.AI.INBOX.agent-001.*");

        Thread.sleep(200);

        // Agent 发起审批请求
        nc.publish("$mq9.AI.INBOX.approver-zhang.urgent",
                "{\"from\":\"agent-001\",\"type\":\"approval_request\",\"correlation_id\":\"approval-001\",\"content\":\"申请调用外部API,预计费用$50\"}".getBytes());
    }

    // ============================================================
    // 场景七:Agent A 向 Agent B 提问,B 可能不在线
    // ============================================================
    static void scenario7_AsyncRequestReply(Connection nc) throws Exception {
        // Agent B:订阅自己的邮箱,处理请求后回复
        Dispatcher agentB = nc.createDispatcher((msg) -> {
            System.out.println("[Agent-B] 收到提问: " + new String(msg.getData()));
            if (msg.getReplyTo() != null) {
                nc.publish(msg.getReplyTo(),
                        "{\"from\":\"agent-B\",\"answer\":\"当前进度72%,预计还需3分钟\",\"correlation_id\":\"req-001\"}".getBytes());
                System.out.println("[Agent-B] 已回复");
            }
        });
        agentB.subscribe("$mq9.AI.INBOX.agent-B.*");

        Thread.sleep(200);

        // Agent A:发请求并等待回复
        System.out.println("[Agent-A] 向 Agent-B 提问...");
        Message reply = nc.request("$mq9.AI.INBOX.agent-B.normal",
                "{\"from\":\"agent-A\",\"type\":\"request\",\"question\":\"当前进度多少?\",\"correlation_id\":\"req-001\"}".getBytes(),
                Duration.ofSeconds(3));

        if (reply != null) {
            System.out.println("[Agent-A] 收到回复: " + new String(reply.getData()));
        } else {
            System.out.println("[Agent-A] 超时未收到回复,B 可能不在线,消息已在邮箱等待");
        }
    }

    // ============================================================
    // 场景八:Agent 注册能力,其他 Agent 发现并调用
    // ============================================================
    static void scenario8_CapabilityDiscovery(Connection nc) throws Exception {
        // 编排器:订阅 Worker 的 latest 状态邮箱
        Dispatcher orchestrator = nc.createDispatcher((msg) -> {
            System.out.println("[编排器] 发现 Agent 能力: " + new String(msg.getData()));
        });
        orchestrator.subscribe("$mq9.AI.INBOX.status-*.*");

        // 也可以通过广播主动查询
        Dispatcher queryResponder = nc.createDispatcher((msg) -> {
            System.out.println("[Worker-001] 响应能力查询");
            nc.publish("$mq9.AI.INBOX.orchestrator.normal",
                    "{\"from\":\"worker-001\",\"capabilities\":[\"data.analysis\"],\"load\":0.3}".getBytes());
        });
        queryResponder.subscribe("$mq9.AI.BROADCAST.capability.query");

        Thread.sleep(200);

        // Worker 上报能力到自己的 latest 邮箱
        nc.publish("$mq9.AI.INBOX.status-worker-001.normal",
                "{\"status\":\"idle\",\"capabilities\":[\"data.analysis\",\"report.generation\"]}".getBytes());
        nc.publish("$mq9.AI.INBOX.status-worker-002.normal",
                "{\"status\":\"running\",\"capabilities\":[\"text.generation\"]}".getBytes());

        Thread.sleep(200);

        // 编排器广播查询:谁有 data.analysis 能力?
        System.out.println("[编排器] 广播查询: 谁有 data.analysis 能力?");
        nc.publish("$mq9.AI.BROADCAST.capability.query",
                "{\"from\":\"orchestrator\",\"need\":\"data.analysis\"}".getBytes());
    }

    // ============================================================
    // main
    // ============================================================
    public static void main(String[] args) throws Exception {
        Connection nc = Nats.connect("nats://localhost:4222");

        System.out.println("=== 场景一:异步任务回传 ===");
        scenario1_AsyncTaskReturn(nc);
        Thread.sleep(500);

        System.out.println("\n=== 场景二:全局状态感知 ===");
        scenario2_GlobalStatusAwareness(nc);
        Thread.sleep(500);

        System.out.println("\n=== 场景三:任务广播与竞争消费 ===");
        scenario3_TaskBroadcastCompete(nc);
        Thread.sleep(500);

        System.out.println("\n=== 场景四:异常告警广播 ===");
        scenario4_AnomalyBroadcast(nc);
        Thread.sleep(500);

        System.out.println("\n=== 场景五:边缘离线消息积压 ===");
        scenario5_EdgeOfflineMessage(nc);
        Thread.sleep(2000);

        System.out.println("\n=== 场景六:人机混合工作流 ===");
        scenario6_HumanApproval(nc);
        Thread.sleep(500);

        System.out.println("\n=== 场景七:异步请求-响应 ===");
        scenario7_AsyncRequestReply(nc);
        Thread.sleep(500);

        System.out.println("\n=== 场景八:能力注册与发现 ===");
        scenario8_CapabilityDiscovery(nc);
        Thread.sleep(1000);

        nc.close();
    }
}
🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀