Skip to content

RobustMQ 的混合架构探索:tokio + 存储层剥离

背景

在上一篇文章(Tokio 调度抖动与 io_uring:从 Iggy 的探索到 RobustMQ 的思考)中,我们分析了 tokio work-stealing 调度器在高并发存储路径上的固有抖动问题,也梳理了 Iggy 花费两年时间完成的从 tokio 到 compio + io_uring 的全量迁移。

Iggy 的选择是激进的:把整个系统的运行时替换掉。这条路是对的,但代价巨大。

我们的思路不同:不替换 tokio,而是把存储密集型路径从 tokio 的调度竞争中剥离出去

但这个思路也有它的边界和代价,需要说清楚。


问题回顾

在 1000 个并发 MQTT 连接的压测下,meta-service 的 raft apply 路径出现明显抖动:

log
WARN meta_service::raft::store::state: apply batch slow: total=11.07ms route=0.07ms set_last_applied=11.00ms
WARN meta_service::raft::store::state: apply batch slow: total=8.76ms  route=0.05ms set_last_applied=8.71ms
WARN meta_service::raft::store::state: apply batch slow: total=10.10ms route=0.07ms set_last_applied=10.02ms

set_last_applied 只做一件事:向 RocksDB 写一条记录。RocksDB 监控显示实际写入耗时 < 1ms。

多出来的 5~10ms 是 tokio 调度等待时间——task 在 await 之后重新排队,等待 worker 线程空出来执行它。

同一时间,MQTT Broker 端的连接处理:

log
WARN mqtt_broker::mqtt::connect: slow total=51.11ms session_process=50.91ms
WARN mqtt_broker::mqtt::connect: slow total=31.47ms session_process=31.32ms

session_process = gRPC 往返 + raft 共识 + apply 执行。raft apply 里的调度抖动,通过 gRPC 链路,放大成 30~51ms 的连接延迟。

根因:tokio worker 线程资源被高并发 MQTT 连接耗尽,apply task 排队等待调度。


为什么不全量迁移

全量换运行时(tokio → compio)的问题:

  1. 迁移成本极高:网络层、gRPC 服务、所有 async 代码都深度依赖 tokio 生态,Iggy 花了将近两年
  2. 生态割裂:tonic、hyper、tracing 等无法直接在 compio 上使用,需要重建整个技术栈
  3. 风险不可控:全量迁移期间系统稳定性难以保证

更根本的是:RobustMQ 的网络层和业务逻辑层本身没有问题。tokio 非常擅长高并发网络 I/O,MQTT 连接管理、协议解析、订阅匹配、消息路由都是 tokio 的强项。抖动集中在存储写入路径上。

用手术刀,不用推土机。


混合架构的思路

整体结构

┌──────────────────────────────────────────────────────────────────┐
│                     tokio runtime(主运行时)                      │
│                                                                  │
│  MQTT Broker               meta-service                          │
│  ─────────────             ─────────────────────                 │
│  网络连接管理               gRPC 服务器                            │
│  MQTT 协议解析              raft 共识逻辑           ─── channel ──►│
│  订阅匹配路由               业务缓存更新                            │
│  消息推送                   (tokio task)         ◄── oneshot ───│
│                                                                  │
└──────────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────┘
│  独立存储线程池

│  meta-service 路径:std::thread + spawn_blocking
│  thread-0  thread-1  thread-2  ...
│  │ RocksDB write │ RocksDB write │ ...
│  │ (同步调用,但不占 tokio worker)

│  storage engine 路径:compio + io_uring
│  thread-0 (core 0)    thread-1 (core 1)   ...
│  ┌────────────────┐   ┌────────────────┐
│  │ compio runtime │   │ compio runtime │
│  │ io_uring loop  │   │ io_uring loop  │
│  │ file.write()   │   │ file.write()   │  ← 真正的 completion-based I/O
│  └────────────────┘   └────────────────┘
└──────────────────────────────────────────────────────

两个部分在同一进程内,通过 channel 通信:

  • tokio 侧发送写请求,用 oneshot::channel 等待结果
  • 存储线程完成操作,通过 oneshot 回复
  • 不共享状态,没有锁

关键区分:两类存储路径,不同的解法

这里必须说清楚一个重要区别:meta-service(RocksDB)和 storage engine(文件写入)适合不同的方案,收益来源完全不同。


meta-service 路径:spawn_blocking 就够了

问题所在

set_last_applied 的抖动来自 tokio 调度等待,不是 RocksDB 慢。证据:RocksDB 监控 < 1ms,但计时显示 5~11ms。

io_uring 在这里几乎没有用武之地。

原因:RocksDB 是 C++ 库,通过 rocksdb-rs 调用,内部使用传统的 pwrite/fsync 系统调用,我们无法从外部把它的内部 I/O 换成 io_uring(除非深度修改 RocksDB,通过它的 Env 接口自定义 I/O 后端,极其复杂)。

调用 db.put_cf() 时,无论外面包了什么 executor,内部都是同步写入。

真正需要的:把 RocksDB 调用移出 tokio worker 线程

tokio::task::spawn_blocking 就能解决 80% 的问题:

rust
// 原来:在 tokio worker 线程上同步调用 RocksDB
// 高并发时 worker 被占满,apply task 排队
async fn set_last_applied(&self, log_id: LogId) -> Result<(), StorageError> {
    self.db.put_cf(&cf, KEY_LAST_APPLIED, encode(&log_id)?)?;
    Ok(())
}

// 改后:把 RocksDB 调用移到 blocking 线程池
// tokio worker 立即释放,不参与调度竞争
async fn set_last_applied(&self, log_id: LogId) -> Result<(), StorageError> {
    let db = self.db.clone();
    let value = encode(&log_id)?;
    tokio::task::spawn_blocking(move || {
        db.put_cf(&cf, KEY_LAST_APPLIED, &value)
    })
    .await??;
    Ok(())
}

spawn_blocking 把调用丢给 tokio 的阻塞线程池(默认上限 512 个线程),当前 tokio worker 立即可以处理其他 task,消除调度积压。

如果想要更精确的控制(比如固定线程数、CPU 亲和性、避免 tokio 阻塞线程池被其他调用者耗尽),也可以自建线程池:

rust
// 自建固定大小的 RocksDB 写入线程池
pub struct RocksDBWritePool {
    tx: std::sync::mpsc::SyncSender<WriteTask>,
}

impl RocksDBWritePool {
    pub fn start(num_threads: usize) -> Arc<Self> {
        let (tx, rx) = std::sync::mpsc::sync_channel(8192);
        let rx = Arc::new(std::sync::Mutex::new(rx));

        for _ in 0..num_threads {
            let rx = rx.clone();
            std::thread::spawn(move || {
                loop {
                    match rx.lock().unwrap().recv() {
                        Ok(task) => task.execute(),
                        Err(_) => return,
                    }
                }
            });
        }

        Arc::new(Self { tx })
    }

    pub async fn write(&self, f: impl FnOnce() -> Result<(), Error> + Send + 'static)
        -> Result<(), Error>
    {
        let (reply_tx, reply_rx) = oneshot::channel();
        self.tx.send(WriteTask { f: Box::new(f), reply: reply_tx }).ok();
        reply_rx.await.unwrap()
    }
}

收益来源:不阻塞 tokio worker,消除调度等待。RocksDB 写入本身的速度没有变化。


storage engine 路径:这里才是 io_uring 的主场

为什么 storage engine 不同

storage engine 的核心路径是顺序写文件:消息写入 segment 文件,每次追加数据到文件末尾。这里我们完全控制文件 fd,可以直接使用 io_uring 的文件 I/O 接口。

tokio 对文件 I/O 的处理方式:

  • tokio::fs::write → 内部用 spawn_blocking → 阻塞线程池执行同步 write()
  • 本质是:用线程并发模拟 I/O 并发,每个并发写入占用一个线程

io_uring 的处理方式:

  • 把操作描述(写哪个 fd、写什么数据、写到哪个偏移)提交到 submission queue
  • 内核异步执行,完成后放入 completion queue
  • 一个线程可以同时 pending 数百个 I/O 操作,不需要为每个操作分配一个线程

这个差异在顺序写场景下意义重大:多个 producer 同时向同一个 segment 文件追加消息时,io_uring 可以把多次 write 批量提交给内核,由内核调度合并,大幅减少系统调用次数和上下文切换。

compio + thread-per-core

storage engine 的存储线程使用 compio(io_uring executor)+ CPU 亲和性绑定:

rust
pub struct FileWriteExecutor {
    shards: Vec<std::sync::mpsc::SyncSender<FileTask>>,
}

impl FileWriteExecutor {
    pub fn start(num_threads: usize) -> Arc<Self> {
        let mut shards = Vec::with_capacity(num_threads);

        for core_id in 0..num_threads {
            let (tx, rx) = std::sync::mpsc::sync_channel::<FileTask>(8192);
            shards.push(tx);

            std::thread::Builder::new()
                .name(format!("storage-io-{}", core_id))
                .spawn(move || {
                    // 绑定到固定 CPU core,保持 page cache 和 TLB 热度
                    core_affinity::set_for_current(CoreId { id: core_id }).ok();

                    // compio runtime = io_uring event loop
                    compio::runtime::Runtime::new()
                        .unwrap()
                        .block_on(file_write_thread_main(rx));
                })
                .unwrap();
        }

        Arc::new(Self { shards })
    }

    /// partition_id 决定落到哪个 shard,同一 partition 的写入有序
    pub async fn append(
        &self,
        partition_id: u64,
        path: PathBuf,
        data: Bytes,
    ) -> Result<u64, IoError> {
        let shard = (partition_id as usize) % self.shards.len();
        let (reply_tx, reply_rx) = oneshot::channel();
        self.shards[shard]
            .send(FileTask::Append { path, data, reply: reply_tx })
            .map_err(|_| IoError::ExecutorShutdown)?;
        reply_rx.await.map_err(|_| IoError::ReplyDropped)?
    }
}

async fn file_write_thread_main(rx: std::sync::mpsc::Receiver<FileTask>) {
    loop {
        // 在 compio context 内接收任务
        let task = compio::runtime::spawn_blocking(move || rx.recv())
            .await
            .unwrap();

        match task {
            Ok(FileTask::Append { path, data, reply }) => {
                // io_uring 异步文件追加
                // 内核持有 buffer 直到写入完成,应用层 await completion
                let result = async {
                    let file = compio::fs::OpenOptions::new()
                        .append(true)
                        .open(&path)
                        .await?;
                    let (_, offset) = file.write_all_at(data, u64::MAX).await?;
                    Ok::<u64, std::io::Error>(offset)
                }
                .await
                .map_err(IoError::Io);
                let _ = reply.send(result);
            }
            Err(_) => return,
        }
    }
}

收益来源:真正的 completion-based 异步 I/O + CPU 亲和性减少 cache 失效 + 批量 I/O 提交减少系统调用。


为什么混合架构有局限性

理解这个方案的局限和 Iggy 全量迁移的差距,和理解它能解决什么同样重要。

局限一:解决的是症状,不是根因

真正的根因是 tokio worker 线程被高并发连接耗尽。混合架构让存储路径不再占用 tokio worker,但 apply 的业务逻辑(decode、cache 更新等其他 await 点)仍在 tokio 上运行。随着连接数继续增长,其他 await 点同样会出现调度等待。这是在给 tokio 减负,而不是从架构上消除不确定性。

局限二:两个 runtime 共存,资源调度复杂

进程里同时跑 tokio 和 compio,CPU core 的分配需要仔细规划——io_uring 线程绑定了某些 core,tokio worker 线程也会调度到这些 core,产生竞争。线程总数增加(tokio worker + io_uring 线程 + RocksDB compaction 线程),在某些情况下可能反而因为线程竞争变差。

局限三:tokio/io_uring 边界有开销

每次 tokio → 存储线程的通信:mpsc::send + await oneshot + 线程唤醒。这个开销在 1~5μs 量级,单次不显著,但高频写入路径下会累积。Iggy 全量迁移后这个边界完全不存在。

局限四:io_uring 对 RocksDB 无效

如前所述,io_uring 对 RocksDB 路径没有直接收益。meta-service 的改善来自"不阻塞 tokio worker",而不是 I/O 模型升级。Iggy 全量换到 compio 之后,所有路径(包括网络 I/O)都受益于 io_uring + thread-per-core,我们的方案只有文件写入路径能得到完整的 io_uring 收益。


与 Iggy 全量迁移的对比

维度Iggy(全量迁移)RobustMQ(混合架构)
运行时tokio → compio(全替换)tokio 保留,存储层剥离
RocksDB 路径改善同步调用,不阻塞 tokio worker同步调用,不阻塞 tokio worker(相同)
文件 I/O 改善全链路 io_uring,真正异步存储线程内 io_uring,部分受益
网络路径改善全链路 io_uring + thread-per-core无(tokio 不动)
调度抖动消除全链路消除存储路径消除,业务逻辑路径仍有
迁移成本极高(~2 年)低(按路径渐进)
生态兼容需要重建tonic、tracing 完全保留
工程风险

简单说:Iggy 的方案理论上限更高,混合架构工程成本更低、风险更小。选哪个取决于系统规模和团队资源。对于当前的 RobustMQ,混合架构是更务实的路径。


预期效果

基于对问题的分析(不是 Iggy 数据的直接套用,因为场景不完全相同):

meta-service(spawn_blocking 改造):

  • set_last_applied 的调度等待从 5~11ms 降至接近 0
  • session_process 从 30~51ms 有明显下降(减去 raft apply 的调度等待部分)
  • gRPC 往返和 raft 共识本身的延迟不变

storage engine(io_uring 改造):

  • 高并发写入下的 P99/P999 延迟分布更窄
  • 顺序写吞吐有所提升,得益于批量 I/O 提交
  • 具体幅度需要压测验证

下一步

方案还在设计阶段,尚未完整实现。计划的推进顺序:

  1. 最小验证:用 spawn_blocking 改造 set_last_applied,和现有实现做 A/B 对比,验证调度等待确实消除
  2. meta-service 改造:完成 raft 存储路径(set_last_applied + append_entries + snapshot)
  3. storage engine 改造:segment 文件写入切换到 compio + io_uring
  4. 压测验证:1000/5000/10000 并发连接下的端到端延迟对比,看尾延迟分布

第一步成本极低,可以快速得到数据。这条路有明确的问题定位(我们自己的日志)和可控的迁移范围,我们会持续记录进展。

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀