Skip to content

RobustMQ 里程碑:统一存储,多协议原生消费,跑通了

今天,RobustMQ 完成了一个里程碑:一条消息通过 MQTT 写入,MQTT、Kafka、NATS、AMQP 四个协议同时消费,底层一份数据,零桥接,零复制。

这件事对我们的意义是:我们确认了一件最重要的事,就是这套架构能撑住这个目标,后续不需要推倒重来。这意味着第一阶段——架构设计与代码打磨——基本收尾。

跑通了什么

一条消息,通过 MQTT 写入。不做任何桥接,不做协议转发,不复制数据——MQTT、Kafka、NATS、AMQP 四个协议的消费端,同时收到了同一条消息。

img

执行命令如下:

bash
# MQTT 发布
mqttx pub -t 'robustmq.multi.protocol' -h '127.0.0.1' -p 1883 -q 1 \
  -u 'admin' -P 'robustmq' -m 'robustmq multi protocol success'

四端同时消费,各用各自原生的工具和 SDK:

bash
# MQTT 消费
mqttx sub -t 'robustmq.multi.protocol' -h '127.0.0.1' -p 1883 -q 2 -u 'admin' -P 'robustmq'

# Kafka 消费
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \
  --topic robustmq.multi.protocol --partition 0 --from-beginning

# NATS 消费
nats sub "robustmq.multi.protocol" --server 127.0.0.1

# AMQP 消费
python3 robustmq/scripts/amqp_consumer.py

底层是一份数据,写入统一的 Shard 存储层,四种协议各自按自己的语义读取。没有数据复制,没有路由转发,没有写放大,没有新概念需要学习。

原生 SDK,原生命令,原生语义。你不需要学任何 RobustMQ 专有的东西,理解成本为零。单个二进制,零外部依赖。


这件事为什么难

传统的多协议方案,走的是并行多Broker或"路由 + 桥接"的路子。比如消息进来,路由引擎根据规则转发到不同的存储后端,每个协议维护自己的数据副本。MQTT 的数据要给 Kafka 消费,就得复制一份写进去。消费端越多,写放大越严重,运维复杂度随之叠加。

RobustMQ 从架构底层选了一条不同的路:统一存储层,协议只是读写接口。消息写入一次,进入 Shard 存储。MQTT、Kafka、NATS、AMQP 各自作为协议层,按照自己的语义从同一份存储里读取。写一次,任意协议消费,数据不复制,存储不冗余。

这条路在设计上更干净,但实现上更难。每种协议都有自己的语义——Kafka 有 offset、partition,MQTT 有 QoS、retain,NATS 有 subject、queue group,AMQP 有 exchange、binding。要让这些协议在同一份存储上各自表现正确,需要在存储抽象层做大量设计工作。三年里推翻过架构,重构过核心模块,否定过自己以为想清楚的设计。每一次推翻都让架构更干净,每一次重构都让代码更扎实。

今天这个 Demo,是这条路走通的第一个信号。


坦诚说,这是验证,不是完成

这次只完成了 MQTT 写入,四个协议基础订阅消费。目的是验证架构的可行性,不是展示完整能力。

项目现状如实说:

  • 整体架构趋于稳定,核心抽象基本固定
  • MQTT 核心功能已基本完备,还在持续打磨
  • Kafka / NATS / AMQP 协议解析完成,但各协议功能都只是起步,后续逐步补全

Kafka、NATS、AMQP 还有很长的路要走。我们展示的是方向,不是终点。基础软件不能靠包装,只能靠积累。


我们想要的未来

这次验证真正让我们兴奋的,不只是四个协议跑通了。

而是我们确认了一件事:在这套架构上,增加一个新协议,在架构层面的代价是很轻的。 1 个协议,5 个,100 个,存储层不需要动。协议只是读写接口,新增协议只是加一层解析。

这个性质,在工业和边缘场景里尤其重要。工厂里的协议生态极度碎片化——OPC-UA、Modbus、CoAP、MQTT-SN,每个行业、每个设备厂商都有自己的一套。传统方案面对这种碎片化只能靠堆桥接、堆适配器,运维复杂度随协议数量线性膨胀。RobustMQ 的架构在这里有根本性的优势:不管外面的协议世界多乱,底层存储只有一份,新增协议只是加一层协议解析,不触碰核心。

AI Agent 通信也是同样的逻辑。Agent 之间需要什么样的通信协议,现在还没有定论。MCP 是一个方向,我们自己也在探索基于 NATS 扩展的 $mq9.AI.API.* Subject 空间。未来可能还会有我们现在想不到的新协议出现。这套架构给了我们从容探索的空间——不需要为每一个新协议重新设计存储,不需要担心加新东西会破坏已有的稳定性。

消息系统这个领域,Kafka 用了十年才成为数据管道的事实标准,MQTT 用了二十年才在 IoT 领域站稳。基础软件没有捷径。我们不知道 RobustMQ 最终会走多远,但值得做的事,就该认真做。


复现步骤

参考 快速安装文档 运行服务。

1. MQTT 发布

bash
mqttx pub -t 'robustmq.multi.protocol' -h '127.0.0.1' -p 1883 -q 1 \
  -u 'admin' -P 'robustmq' -m 'robustmq multi protocol success'

2. MQTT 消费

bash
mqttx sub -t 'robustmq.multi.protocol' -h '127.0.0.1' -p 1883 -q 2 \
  -u 'admin' -P 'robustmq'

3. Kafka 消费

bash
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 \
  --topic robustmq.multi.protocol --partition 0 --from-beginning

4. AMQP 消费

bash
python3 robustmq/scripts/amqp_consumer.py

5. NATS 消费

bash
nats sub "robustmq.multi.protocol" --server 127.0.0.1
🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀