Skip to content

RobustMQ MQTT 存储模型

离线消息

默认情况下,客户端订阅 Topic 中的数据采用内存分发机制。当 Broker 接收到 Topic 数据时,会先判断该 Topic 是否已被订阅。如果已被订阅,则直接将消息推送给订阅者;如果未被订阅,该消息将被丢弃。

这种特性适用于 MQTT 的常见应用场景,即只保留最新数据。例如,传感器设备离线后重新上线时,只需要接收最新的控制指令,无需接收历史指令。

然而,在许多实际应用场景中,订阅者下线后重新上线时,需要接收离线期间产生的所有数据,并按顺序进行消费。此时,默认的处理策略无法满足需求,因此引入了离线消息的概念。

离线消息是指当 Topic 收到数据时,无论是否有订阅者在线,数据都会被持久化保存。当订阅者重新上线后,系统会按顺序将离线期间的数据推送给客户端。

RobustMQ 支持离线消息特性,允许通过配置开启该功能。离线消息配置分为 Topic 和 Cluster 两个级别,Topic 级别的配置优先级高于 Cluster 级别。

[mqtt_offline_message]
enable = true

读写模型

MQTT 的读写模型如下图所示。在集群部署架构中,每个客户端都可以连接到任意一个 Broker 节点,并向任何 Topic 发送或订阅消息。例如,客户端可以在 Broker1、Broker2、Broker3 上发送和订阅消息。因此,从技术角度来看,每个 Broker 节点都需要能够读写完整的 Topic 数据。

img

这意味着,当数据写入 Broker1 后,连接到 Broker2 或 Broker3 的客户端也应该能够订阅到该 Topic 的最新数据。因此,在集群部署架构下,需要解决数据在多节点间的同步问题。

从技术实现角度看,主要有两种思路:

  1. 将数据分发到所有 Broker 节点
  2. 通过统一的存储引擎实现集中式读写

这两种方案本质上要解决的问题相同,实现思路也基本一致。考虑到 RobustMQ 支持插件化的存储模型,并将在后续支持多种存储引擎,技术实现上选择了第二种方案。

存储模型

在单机部署模式下,RobustMQ MQTT 支持 Memory 和 RocksDB 两种存储引擎。需要注意的是,这两种存储引擎仅适用于单机部署场景。如果在集群模式下使用这两种引擎,会导致订阅者无法接收到消息,因为 Topic 的消息不会在节点间进行同步。

在单机部署场景下,Memory 和 RocksDB 两种存储引擎的主要区别在于是否支持持久化。由于单机模式通常仅用于开发和测试环境,而非生产环境,因此持久化特性的差异并不显著。

设计思考:实际上,同时支持 Memory 和 RocksDB 两种引擎的必要性有待商榷。对于单机测试场景,RocksDB 引擎已经能够满足大多数需求,既支持持久化,又能覆盖多种测试场景。因此,后续版本可能会移除 Memory 类型。至于是否为这两种引擎增加节点间同步功能,这将引入数据一致性的复杂性,与存算分离的架构理念相冲突,因此不作考虑。

在集群部署模式下,RobustMQ MQTT 支持 Journal Engine、S3、MySQL、Redis 等远程存储引擎。数据会直接写入这些远程存储系统,使得所有 Broker 节点都能访问完整的数据。

集群部署架构必须依赖某个远程存储系统来实现节点间的数据同步。为了降低对外部存储系统的依赖,RobustMQ 提供了内置的分布式存储引擎 Journal Engine,通过它来实现数据在节点间的同步。

img

在集群模式下,数据流转过程如下:

数据写入流程

  • 客户端连接到任意一台 Broker 节点
  • Broker 通过 Storage Adapter(插件化存储层)将数据写入远程存储(如 Journal Engine)
  • 远程存储写入完成后,Broker 向客户端返回写入成功响应

数据消费流程

  • 客户端连接到任意一台 Broker 节点订阅数据
  • Broker 通过 Storage Adapter(插件化存储层)从远程存储(如 Journal Engine)读取数据
  • Broker 将读取的数据推送给客户端

综上所述,对于单机测试场景,实际上只需要保留 RocksDB 这一种存储引擎即可。它既支持持久化,又能够满足各种测试场景的需求。

MQTT 与 Journal Engine

在集群模式下,RobustMQ MQTT 至少需要启用 Journal Engine 作为存储引擎,即将存储层配置为 Journal Engine,这样系统才能正常工作。当然,也可以根据实际需求更换为其他存储引擎,例如 S3。

Journal Engine 专为 MQTT 的快速分发场景设计,支持两种存储机制:

内存存储

  • 数据写入到 Journal Engine 某个节点的内存中
  • 直接从该节点读取数据,数据不进行持久化
  • 适用于类似 NATS 的快速分发场景,性能最优

文件存储

  • 数据持久化存储到磁盘文件
  • 适用于需要数据可靠性保证的场景

用户可以根据实际需求配置 Journal Engine 的存储模式,在高性能和数据持久化之间进行权衡。内存存储模式主要解决快速分发的性能问题,而文件存储模式则保证数据的可靠性。

数据映射关系

  • 一个 MQTT Topic 对应 Journal Engine 中的一个 Shard
  • Topic 数据在 Shard 中按顺序存储
  • 写入和读取均按顺序进行,保证消息的有序性