RobustMQ MQTT 存储模型
离线消息
默认情况下,客户端订阅 Topic 中的数据采用内存分发机制。当 Broker 接收到 Topic 数据时,会先判断该 Topic 是否已被订阅。如果已被订阅,则直接将消息推送给订阅者;如果未被订阅,该消息将被丢弃。
这种特性适用于 MQTT 的常见应用场景,即只保留最新数据。例如,传感器设备离线后重新上线时,只需要接收最新的控制指令,无需接收历史指令。
然而,在许多实际应用场景中,订阅者下线后重新上线时,需要接收离线期间产生的所有数据,并按顺序进行消费。此时,默认的处理策略无法满足需求,因此引入了离线消息的概念。
离线消息是指当 Topic 收到数据时,无论是否有订阅者在线,数据都会被持久化保存。当订阅者重新上线后,系统会按顺序将离线期间的数据推送给客户端。
RobustMQ 支持离线消息特性,允许通过配置开启该功能。离线消息配置分为 Topic 和 Cluster 两个级别,Topic 级别的配置优先级高于 Cluster 级别。
[mqtt_offline_message]
enable = true读写模型
MQTT 的读写模型如下图所示。在集群部署架构中,每个客户端都可以连接到任意一个 Broker 节点,并向任何 Topic 发送或订阅消息。例如,客户端可以在 Broker1、Broker2、Broker3 上发送和订阅消息。因此,从技术角度来看,每个 Broker 节点都需要能够读写完整的 Topic 数据。

这意味着,当数据写入 Broker1 后,连接到 Broker2 或 Broker3 的客户端也应该能够订阅到该 Topic 的最新数据。因此,在集群部署架构下,需要解决数据在多节点间的同步问题。
从技术实现角度看,主要有两种思路:
- 将数据分发到所有 Broker 节点
- 通过统一的存储引擎实现集中式读写
这两种方案本质上要解决的问题相同,实现思路也基本一致。考虑到 RobustMQ 支持插件化的存储模型,并将在后续支持多种存储引擎,技术实现上选择了第二种方案。
存储模型
在单机部署模式下,RobustMQ MQTT 支持 Memory 和 RocksDB 两种存储引擎。需要注意的是,这两种存储引擎仅适用于单机部署场景。如果在集群模式下使用这两种引擎,会导致订阅者无法接收到消息,因为 Topic 的消息不会在节点间进行同步。
在单机部署场景下,Memory 和 RocksDB 两种存储引擎的主要区别在于是否支持持久化。由于单机模式通常仅用于开发和测试环境,而非生产环境,因此持久化特性的差异并不显著。
设计思考:实际上,同时支持 Memory 和 RocksDB 两种引擎的必要性有待商榷。对于单机测试场景,RocksDB 引擎已经能够满足大多数需求,既支持持久化,又能覆盖多种测试场景。因此,后续版本可能会移除 Memory 类型。至于是否为这两种引擎增加节点间同步功能,这将引入数据一致性的复杂性,与存算分离的架构理念相冲突,因此不作考虑。
在集群部署模式下,RobustMQ MQTT 支持 Journal Engine、S3、MySQL、Redis 等远程存储引擎。数据会直接写入这些远程存储系统,使得所有 Broker 节点都能访问完整的数据。
集群部署架构必须依赖某个远程存储系统来实现节点间的数据同步。为了降低对外部存储系统的依赖,RobustMQ 提供了内置的分布式存储引擎 Journal Engine,通过它来实现数据在节点间的同步。

在集群模式下,数据流转过程如下:
数据写入流程:
- 客户端连接到任意一台 Broker 节点
- Broker 通过 Storage Adapter(插件化存储层)将数据写入远程存储(如 Journal Engine)
- 远程存储写入完成后,Broker 向客户端返回写入成功响应
数据消费流程:
- 客户端连接到任意一台 Broker 节点订阅数据
- Broker 通过 Storage Adapter(插件化存储层)从远程存储(如 Journal Engine)读取数据
- Broker 将读取的数据推送给客户端
综上所述,对于单机测试场景,实际上只需要保留 RocksDB 这一种存储引擎即可。它既支持持久化,又能够满足各种测试场景的需求。
MQTT 与 Journal Engine
在集群模式下,RobustMQ MQTT 至少需要启用 Journal Engine 作为存储引擎,即将存储层配置为 Journal Engine,这样系统才能正常工作。当然,也可以根据实际需求更换为其他存储引擎,例如 S3。
Journal Engine 专为 MQTT 的快速分发场景设计,支持两种存储机制:
内存存储:
- 数据写入到 Journal Engine 某个节点的内存中
- 直接从该节点读取数据,数据不进行持久化
- 适用于类似 NATS 的快速分发场景,性能最优
文件存储:
- 数据持久化存储到磁盘文件
- 适用于需要数据可靠性保证的场景
用户可以根据实际需求配置 Journal Engine 的存储模式,在高性能和数据持久化之间进行权衡。内存存储模式主要解决快速分发的性能问题,而文件存储模式则保证数据的可靠性。
数据映射关系:
- 一个 MQTT Topic 对应 Journal Engine 中的一个 Shard
- Topic 数据在 Shard 中按顺序存储
- 写入和读取均按顺序进行,保证消息的有序性
