Skip to content

RobustMQ 存储层设计思考(下):从理念到方案

在上一篇文章《RobustMQ:关于消息队列存储层的一些想法》中,我分享了对存储层的一些初步思考:为什么要存算分离、消息队列的多样化场景、以及插件化存储的设计理念。

那篇文章更多是"想法"层面的探讨。这段时间,随着 Storage Engine 的深入设计,很多具体问题逐渐清晰。本文分享我在核心存储引擎设计中的思考过程:遇到了哪些问题,如何一步步找到解决方案,以及为什么做出这些选择。

插件化存储的本质

在进入具体设计之前,先简单回顾一下插件化存储的核心思想。面对消息队列的多样化场景需求,RobustMQ 的答案是插件化存储,但这不是为了技术炫技。

插件化的核心价值是场景适配。Storage Engine 是核心,专门为生产环境集群化部署设计,系统性解决弹性、成本、一致性问题。Memory 和 RocksDB 服务 Standalone 模式和边缘场景,特点是无外部依赖、安装包小。MySQL、Redis 虽然用得少,但有些企业有特殊需求,比如合规要求数据必须在指定数据库,或者想复用现有基础设施。

插件化不是为了"支持很多存储",而是在架构上留足扩展空间。当用户遇到我们没预料到的场景时,可以自己实现 Storage Adapter,而不是被迫放弃或 fork 代码。这是架构开放性,而非功能堆砌。

本文重点讨论的是 Storage Engine 的设计思考。这是 RobustMQ 的核心,也是我投入最多精力的部分。

问题一:高并发下如何保证顺序

Storage Engine 要解决的第一个问题是:同一个 Partition 内的消息 offset 必须严格连续,在高并发写入场景下如何实现?

最直观的想法是用锁。每个 Partition 一把锁,写入时获取锁、分配 offset、写文件、释放锁。实现很简单,二十行代码搞定。但问题在于:每个线程获取锁后通常只写一条消息就释放,导致大量的小 I/O 操作。频繁的小写入对磁盘很不友好,无论 SSD 还是 HDD,批量写入的效率都远高于零散写入。而且锁的持有时间包含了磁盘 I/O,高并发时锁竞争会很严重。

我的解决方案是 I/O Pool 架构。用固定数量的 I/O Worker(比如 16 个)管理所有 Partition。通过 partition_id % worker_count 的固定映射,保证同一 Partition 的请求总是路由到同一 Worker。Worker 批量接收请求,先等待第一个请求,然后非阻塞地收集后续请求,可能一次收集几百上千个。这些请求按 partition 分组后批量处理,一次 fsync 可以持久化几百上千条消息。

这就把"每条消息一次小写入"变成了"批量消息一次大写入"。同时内存占用只跟 Worker 数量有关,这个设计的关键是:用队列排队代替锁等待,用批量处理提升磁盘效率

在这个过程中,我们也采用了零拷贝技术。使用 Rust 的 Bytes 类型,数据从网络到磁盘全程只有一份,通过引用计数在不同地方共享,避免了传统实现中的多次内存拷贝,进一步降低了 CPU 和内存开销。

问题二:索引如何构建

消息队列需要多种索引来支持不同的查询场景:按 offset 快速定位、按时间范围查询、按 key 或 tag 过滤。我选择用 RocksDB 来存储这些索引数据,因为 RocksDB 本身就是高性能的 KV 存储,天然适合索引场景,而且支持批量写入,正好匹配我们的批量处理模式。

接下来的问题是:索引应该同步构建还是异步构建?异步构建看起来可以提升写入性能,但问题很多。如何知道每条记录在文件中的 position?如何处理写入线程和索引构建线程对文件的竞态访问?如何保证索引不丢失?崩溃后如何恢复索引构建进度?实现复杂度很高,而且用户刚写入的数据可能因为索引还没构建完而查不到,体验很差。

我选择了同步构建索引。关键洞察是:在批量写入的场景下,索引构建的开销很小。Worker 批量处理 1000 条记录时,会同时构建这 1000 条记录的所有索引(offset 索引、时间索引、key 索引、tag 索引),然后通过 RocksDB 的 WriteBatch 一次性写入。数据文件写入需要一次 I/O,RocksDB 索引写入也需要一次 I/O,总共两次 I/O。增加的延迟约 1ms,但换来的是索引立即可用、数据和索引强一致、崩溃恢复简单。

对于 offset 索引,我采用稀疏索引策略。不是每条记录都建索引,而是每 1000 条记录一个索引点。查询时先通过 RocksDB 定位到最近的索引点,获得文件 position,然后从该位置顺序扫描最多 1000 条记录即可找到目标数据。稀疏索引占用极小(1000 万条记录约 240KB),但查询速度仍然很快(整体延迟约 2ms)。时间索引、key 索引、tag 索引可以根据业务需求选择性构建,都通过 RocksDB 批量写入,开销可控。

问题三:一致性协议的选择

RobustMQ 采用存算分离,Segment 分布在多个 Storage Node。副本间如何保证一致性?我对比了 ISR 和 Quorum 两种方案。

Quorum 看起来很美:无单点、故障恢复快、更符合存算分离理念。Broker 并发写 3 个 Storage Node,等待 2 个确认就返回。但我发现了一个核心问题:任何副本都可能不完整

比如写入 1000 条消息,可能 Storage A 缺了 100 条,B 缺了 120 条,C 缺了 80 条。虽然每条消息都满足 Quorum,但没有任何副本是完整的。读取时如果恰好读到缺失的副本,就会失败,需要重试。更严重的是顺序消费场景,消费者可能因为遇到"空洞"而误以为消费完了,实际漏掉了后续数据。

Quorum 依赖复杂的后台修复逻辑:定期扫描所有 Segment,检查副本完整性,自动补齐缺失数据。这个修复逻辑本身就很复杂,需要处理修复速度跟不上写入、修复期间的读取失败、大规模场景的修复延迟等问题。而且如果修复做不好(作为新项目初期很难做好),缺失会持续累积,读取失败率上升,形成恶性循环。这是实实在在的工程风险。

我最终选择了 ISR。虽然有 Leader,但 Leader 保证数据完整,ISR Follower 也完整,读取 100% 成功,无需后台修复。Follower 通过 Pull 模式主动拉取,在大 QPS 小消息场景下性能更好,因为可以批量拉取,网络请求从百万级降到百级。这是工程上的务实选择:选简单可靠而非理论完美

虽然有 Leader,但通过合理的 Segment 分布,Leader 分散在不同 Storage Node,不会形成瓶颈。Broker 层仍然无状态,存算分离的核心优势保留。我们也提供灵活的 acks 配置,让用户在强一致(acks=all)、平衡(acks=quorum)、高性能(acks=1)之间自由选择。

问题四:如何降低 Leader 管理成本

采用 ISR 后,新问题出现了:如果每个 Segment 都有 Leader,数量会很庞大。1000 个 Shard,每个 100 个 Segment,就需要管理 10 万个 Leader。Leader 选举、ISR 维护、心跳监控的开销会非常大。

我的解决方案是区分 Active Segment 和 Sealed Segment。只有正在写入的 Active Segment 才有 Leader 和 ISR 机制。当 Segment 写满(比如达到 1GB)或达到时间阈值时,会被封存为 Sealed Segment。封存时 Leader 会等待所有 ISR Follower 完全追上,并验证所有副本的一致性。只有在所有副本都完整且一致后,才会标记为 Sealed,然后释放 Leader 角色。

Sealed Segment 没有 Leader,所有副本地位平等,可以从任意副本读取。这样 Leader 数量就等于 Shard 数量,而不是 Segment 总数。1000 个 Shard 只需要 1000 个 Leader,管理开销降低 100 倍。而且 Sealed Segment 的读取可以负载均衡到所有副本,99% 的历史数据读取压力分散到所有 Storage Node,充分利用存储资源。

问题五:如何避免数据迁移

Kafka 扩容时需要在 Broker 间拷贝数据,这个过程可能耗时几小时甚至几天,还会影响线上性能。我思考的是:能否完全避免数据迁移?

答案是:分段 Segment 设计天然解决了这个问题。当新增 Storage Node 时,不需要迁移任何历史数据。只需要等待当前 Active Segment 写满,新创建的 Segment 就会被分配到新节点上,开始承担流量。由于高流量场景下 1GB 的 Segment 可能在几分钟到几十分钟内就会写满,新节点很快就能分担负载。整个过程对业务完全透明,无性能损失。

如果业务特别紧急需要立即分散负载,也可以手动触发封存操作,强制将当前 Active Segment 标记为 Sealed。新创建的 Segment 会立即分配到新节点,瞬间实现负载转移。但一般情况下这个手动操作并不需要,因为大流量场景下 Segment 会快速写满,系统会自动完成切换。

历史数据保留在原节点上,作为 Sealed Segment 继续提供读取服务。如果需要释放老节点的空间,可以通过分层存储将 Sealed Segment 迁移到 S3,这个迁移是后台异步进行的,不影响业务。缩容时也是类似的逻辑,停止在要下线的节点上创建新 Segment,等待其上的 Active Segment 自然写满并封存,然后将 Sealed Segment 迁移走或者保留在 S3,节点即可下线。

这种设计相比 Kafka 的数据迁移方式,优势明显:扩容立即生效(新 Segment 马上创建),无历史数据迁移(避免大量数据拷贝),对业务无影响(读写不中断),操作简单(自动化完成)。这是分段存储设计带来的自然优势,不迁移历史数据,只让新数据流向新节点

问题六:成本优化与分层存储

Sealed Segment 的不可变特性,让分层存储的实现变得非常简单。由于 Segment 不可变且所有副本完整,可以从任意副本读取完整文件,直接上传到 S3,更新元数据即可。不需要复杂的协调机制,失败了重试即可。

我们的分层存储策略是这样的:热数据层的 Active Segment 存储在本地 SSD,使用 3 副本和 ISR 机制,提供最低延迟和最高吞吐。温数据层的 Recent Sealed Segment 仍在本地存储(SSD 或 HDD),任意副本可读,延迟稍高但仍然很快。冷数据层的 Old Sealed Segment 迁移到 S3,延迟虽然增加到 50ms,但成本极低且容量无限。通过分层存储,本地只需保留近期热数据(比如 7 天),历史数据全部在 S3。对于每天写入 1TB、保留 1 年的场景,存储成本可以从 21 万美元降低到 1 万美元,节省 95%。

更灵活的是副本策略的选择。多副本加本地存储是默认配置,适合核心业务数据。但对于日志归档类数据,可以配置单副本直接写 S3,降低本地存储成本,避免集群内网络复制开销,依赖 S3 的高可靠性保证。对于实时推送、行情数据等极低延迟场景,可以配置单副本或多副本内存存储,延迟可以达到 100µs 级别,代价是可能丢数据但业务可以接受。内存加异步持久化则兼顾了性能和可靠性,适合会话状态等场景。

不同场景选择不同策略,在性能、成本、可靠性之间灵活权衡。这种灵活性来自于对真实需求的理解,而不是技术上的堆砌。

意外的收获:数据湖集成

在思考分层存储时,我发现了一个额外价值:Sealed Segment 天然适合写入数据湖。Segment 不可变且完整,迁移到 S3 时可以转换为 Parquet 格式,Spark、Hive 等分析工具可以直接查询。这让 RobustMQ 不只是消息队列,还成为数据基础设施的一部分。消息数据直接成为数据湖源头,无需 ETL,支持实时数仓和离线分析。历史数据在 S3,可以随时回溯分析,满足审计和合规要求。

这不是一开始计划的功能,而是在思考分层存储时自然涌现的价值。好的设计往往会带来意外收获。

设计哲学:务实主义

回顾整个设计过程,我的核心思考模式是场景驱动、问题导向、务实选择

每个技术决策都基于实际问题:高并发用 I/O Pool 解决批量问题,索引用 RocksDB 提升查询性能,同步构建避免一致性问题,一致性选 ISR 避免修复复杂度,Active/Sealed 分离降低 Leader 成本,分段设计实现无迁移扩容,分层存储优化成本。这些不是技术上的炫技,而是对实际问题的直接回应。

我们相信,在存储层这个基础组件上,应该选择经过验证的成熟方案,并在此基础上做针对性优化,而不是追求理论上完美但工程上充满风险的方案。选择 ISR 而不是 Quorum,是因为看重数据完整性的简单保证。选择同步构建索引,是因为批量写入下开销很小但收益明显。选择 Active/Sealed 分离,是因为这个简单优化就能大幅降低管理成本。选择分段设计,是因为它自然解决了数据迁移难题。

Storage Engine 服务主流场景用成熟技术,其他存储选项(Memory、RocksDB、MySQL、Redis、S3)服务长尾场景,插件化架构留足未来扩展空间。当我们谈论存储层的灵活性时,指的不是技术上的灵活性,而是场景适配的灵活性。同一个 RobustMQ,通过不同配置,可以是高性能集群消息队列,可以是低成本日志收集系统,可以是边缘轻量级部署,可以是极低延迟内存推送。

下一步

本文分享的是我在 Storage Engine 设计中的思考过程。但坦白说,具体的实现细节还在持续优化中,这是一个在实践中验证和完善的过程。

我们的思路是:先适配 MQTT 场景实现存储,在实践中验证设计的合理性,然后一步步完善整体的存储模型。MQTT 场景会是我们的第一个深度验证,Kafka 场景会是第二个,每个场景都会带来新的挑战和启发。

存储引擎的设计是 RobustMQ 最核心的技术挑战之一。我们会持续探索,持续优化,在实践中验证理念。从内核开始,一步一个脚印。这是我们的选择,也是我们的坚持。


关于 RobustMQ

RobustMQ 是一个用 Rust 编写的高性能消息队列,采用存算分离架构,目标是成为下一代消息中间件基础设施。欢迎关注我们的 GitHub,参与技术讨论。