Skip to content

存储层 Storage Adapter 架构

设计理念

RobustMQ 存储层采用可插拔架构,集群支持多种底层存储引擎,业务可以根据实际需求灵活选择。核心实现是通过 Storage Adapter 层,使用 Rust 的 Trait 定义底层存储引擎需要具备的能力。只要实现了这些 Trait,任何存储系统都可以作为 RobustMQ 的底层存储引擎,并且支持运行时动态选择和切换。

支持的存储引擎

存储引擎分为本地存储和远程存储两大类。

本地存储引擎

本地存储主要用于单机部署和开发测试场景。目前支持 Memory 和 RocksDB 两种引擎。Memory 引擎将数据完全存储在内存中,适合临时数据和高速缓存场景。RocksDB 是一个持久化的 KV 存储引擎,适合需要持久化的开发测试环境。Local File 引擎目前还在规划中。

本地存储的特点是部署简单,适合单机和测试模式,但不支持多节点和多副本容灾,也就是说数据可靠性完全依赖单机的可靠性。

远程存储引擎

远程存储用于集群模式和生产环境。Journal Engine 是 RobustMQ 自研的分布式存储引擎,也是默认的存储引擎,目前正在开发中。它专门为消息队列场景设计,可以提供低延时、高吞吐的持久化存储能力。

MySQL 引擎已经支持,适合需要将消息数据直接存储到 MySQL 的业务场景。Redis、MinIO、S3 等引擎还在规划中。MinIO 和 S3 主要面向大规模数据场景,可以提供低成本的对象存储能力。

远程存储的优势是支持分布式部署、多副本、高可靠,适合生产环境的高可靠和持久化场景。

Storage Adapter 架构

下图展示了 Storage Adapter 在整个系统中的位置:

img

核心能力定义

Storage Adapter 是 Broker 中的一个代码模块,通过 Trait 定义底层存储引擎需要具备的能力。主要包括以下几个方面:

Shard 管理能力负责分片的生命周期管理,包括创建分片、列出分片、删除分片等操作。数据写入能力支持单条写入和批量写入,写入后会返回 offset。

查询能力比较灵活,支持多个维度。可以按 offset 读取,也可以按 key 或 tag 读取,还支持根据时间戳查找对应的 offset。这种设计可以满足不同消息协议的查询需求。

消费组管理负责消费进度的管理,支持提交和获取消费 offset。消息过期功能负责数据的生命周期管理,支持基于时间和基于大小的数据淘汰策略。

只要实现了这些能力,任何存储系统都可以作为 RobustMQ 的底层存储引擎。

如何使用

配置方式

存储引擎通过启动配置来选择。配置文件中的 message_storage 部分用于指定存储引擎类型和相关配置:

json
"message_storage": {
  "storage_type": "Memory",
  "journal_config": null,
  "memory_config": null,
  "minio_config": null,
  "mysql_config": null,
  "rocksdb_config": null,
  "s3_config": null
}

storage_type 字段指定存储引擎类型,比如 Memory、RocksDB、MySQL 等。其他的 *_config 字段则是各个存储引擎的具体配置项。

多引擎支持

RobustMQ 支持同时配置多个存储引擎。集群启动时需要指定一个默认的存储引擎。创建 Topic 时可以指定使用哪个存储引擎,不同的 Topic 可以使用不同的存储引擎。如果创建 Topic 时没有指定,就会使用集群的默认存储引擎。

存储引擎变更

Topic 的存储引擎是可以变更的,但需要手动指定变更策略。一种策略是直接切换,不保留旧数据,这种方式适合测试环境或者历史数据不重要的场景。另一种策略是数据搬迁,把旧引擎的数据迁移到新引擎,适合生产环境和重要的历史数据。存储引擎变更是比较重要的操作,需要谨慎选择策略。

Shard 概念

设计背景

不同消息队列系统的存储单位各不相同,但本质上都采用 Append Only(只追加)模型。MQTT 和 AMQP 的存储单位是 Topic 和 Queue,Kafka 是 Partition,RocketMQ 是 MessageQueue,Pulsar 也是 Partition。虽然名字不一样,但存储模型都是一样的。

Shard 统一抽象

Storage Adapter 引入了 Shard 这个概念来统一抽象不同协议的消息数据存储。引入 Shard 主要是为了屏蔽不同消息系统的存储差异,提供统一的存储接口和语义,同时简化存储引擎的实现复杂度。

Shard 是 RobustMQ 的统一数据存储组织单位,类似于 Kafka 的 Partition、RocketMQ 的 MessageQueue、Pulsar 的 Partition。它支持分布式部署和多副本存储,提供 Append Only 的写入模型,同时支持按 offset、key、tag、timestamp 等多个维度进行查询。

通过 Shard 这个统一的抽象,RobustMQ 可以用同样的方式来处理 MQTT、AMQP、Kafka 等多种协议的消息存储需求,存储引擎也只需要实现一套接口就可以支持所有协议。