RobustMQ 的混合架构探索:tokio + 存储层剥离
背景
在上一篇文章(Tokio 调度抖动与 io_uring:从 Iggy 的探索到 RobustMQ 的思考)中,我们分析了 tokio work-stealing 调度器在高并发存储路径上的固有抖动问题,也梳理了 Iggy 花费两年时间完成的从 tokio 到 compio + io_uring 的全量迁移。
Iggy 的选择是激进的:把整个系统的运行时替换掉。这条路是对的,但代价巨大。
我们的思路不同:不替换 tokio,而是把存储密集型路径从 tokio 的调度竞争中剥离出去。
但这个思路也有它的边界和代价,需要说清楚。
问题回顾
在 1000 个并发 MQTT 连接的压测下,meta-service 的 raft apply 路径出现明显抖动:
WARN meta_service::raft::store::state: apply batch slow: total=11.07ms route=0.07ms set_last_applied=11.00ms
WARN meta_service::raft::store::state: apply batch slow: total=8.76ms route=0.05ms set_last_applied=8.71ms
WARN meta_service::raft::store::state: apply batch slow: total=10.10ms route=0.07ms set_last_applied=10.02msset_last_applied 只做一件事:向 RocksDB 写一条记录。RocksDB 监控显示实际写入耗时 < 1ms。
多出来的 5~10ms 是 tokio 调度等待时间——task 在 await 之后重新排队,等待 worker 线程空出来执行它。
同一时间,MQTT Broker 端的连接处理:
WARN mqtt_broker::mqtt::connect: slow total=51.11ms session_process=50.91ms
WARN mqtt_broker::mqtt::connect: slow total=31.47ms session_process=31.32mssession_process = gRPC 往返 + raft 共识 + apply 执行。raft apply 里的调度抖动,通过 gRPC 链路,放大成 30~51ms 的连接延迟。
根因:tokio worker 线程资源被高并发 MQTT 连接耗尽,apply task 排队等待调度。
为什么不全量迁移
全量换运行时(tokio → compio)的问题:
- 迁移成本极高:网络层、gRPC 服务、所有 async 代码都深度依赖 tokio 生态,Iggy 花了将近两年
- 生态割裂:tonic、hyper、tracing 等无法直接在 compio 上使用,需要重建整个技术栈
- 风险不可控:全量迁移期间系统稳定性难以保证
更根本的是:RobustMQ 的网络层和业务逻辑层本身没有问题。tokio 非常擅长高并发网络 I/O,MQTT 连接管理、协议解析、订阅匹配、消息路由都是 tokio 的强项。抖动集中在存储写入路径上。
用手术刀,不用推土机。
混合架构的思路
整体结构
┌──────────────────────────────────────────────────────────────────┐
│ tokio runtime(主运行时) │
│ │
│ MQTT Broker meta-service │
│ ───────────── ───────────────────── │
│ 网络连接管理 gRPC 服务器 │
│ MQTT 协议解析 raft 共识逻辑 ─── channel ──►│
│ 订阅匹配路由 业务缓存更新 │
│ 消息推送 (tokio task) ◄── oneshot ───│
│ │
└──────────────────────────────────────────────────────────────────┘
│
┌────────────────────────────────────────────────────────┘
│ 独立存储线程池
│
│ meta-service 路径:std::thread + spawn_blocking
│ thread-0 thread-1 thread-2 ...
│ │ RocksDB write │ RocksDB write │ ...
│ │ (同步调用,但不占 tokio worker)
│
│ storage engine 路径:compio + io_uring
│ thread-0 (core 0) thread-1 (core 1) ...
│ ┌────────────────┐ ┌────────────────┐
│ │ compio runtime │ │ compio runtime │
│ │ io_uring loop │ │ io_uring loop │
│ │ file.write() │ │ file.write() │ ← 真正的 completion-based I/O
│ └────────────────┘ └────────────────┘
└──────────────────────────────────────────────────────两个部分在同一进程内,通过 channel 通信:
- tokio 侧发送写请求,用
oneshot::channel等待结果 - 存储线程完成操作,通过 oneshot 回复
- 不共享状态,没有锁
关键区分:两类存储路径,不同的解法
这里必须说清楚一个重要区别:meta-service(RocksDB)和 storage engine(文件写入)适合不同的方案,收益来源完全不同。
meta-service 路径:spawn_blocking 就够了
问题所在
set_last_applied 的抖动来自 tokio 调度等待,不是 RocksDB 慢。证据:RocksDB 监控 < 1ms,但计时显示 5~11ms。
io_uring 在这里几乎没有用武之地。
原因:RocksDB 是 C++ 库,通过 rocksdb-rs 调用,内部使用传统的 pwrite/fsync 系统调用,我们无法从外部把它的内部 I/O 换成 io_uring(除非深度修改 RocksDB,通过它的 Env 接口自定义 I/O 后端,极其复杂)。
调用 db.put_cf() 时,无论外面包了什么 executor,内部都是同步写入。
真正需要的:把 RocksDB 调用移出 tokio worker 线程
用 tokio::task::spawn_blocking 就能解决 80% 的问题:
// 原来:在 tokio worker 线程上同步调用 RocksDB
// 高并发时 worker 被占满,apply task 排队
async fn set_last_applied(&self, log_id: LogId) -> Result<(), StorageError> {
self.db.put_cf(&cf, KEY_LAST_APPLIED, encode(&log_id)?)?;
Ok(())
}
// 改后:把 RocksDB 调用移到 blocking 线程池
// tokio worker 立即释放,不参与调度竞争
async fn set_last_applied(&self, log_id: LogId) -> Result<(), StorageError> {
let db = self.db.clone();
let value = encode(&log_id)?;
tokio::task::spawn_blocking(move || {
db.put_cf(&cf, KEY_LAST_APPLIED, &value)
})
.await??;
Ok(())
}spawn_blocking 把调用丢给 tokio 的阻塞线程池(默认上限 512 个线程),当前 tokio worker 立即可以处理其他 task,消除调度积压。
如果想要更精确的控制(比如固定线程数、CPU 亲和性、避免 tokio 阻塞线程池被其他调用者耗尽),也可以自建线程池:
// 自建固定大小的 RocksDB 写入线程池
pub struct RocksDBWritePool {
tx: std::sync::mpsc::SyncSender<WriteTask>,
}
impl RocksDBWritePool {
pub fn start(num_threads: usize) -> Arc<Self> {
let (tx, rx) = std::sync::mpsc::sync_channel(8192);
let rx = Arc::new(std::sync::Mutex::new(rx));
for _ in 0..num_threads {
let rx = rx.clone();
std::thread::spawn(move || {
loop {
match rx.lock().unwrap().recv() {
Ok(task) => task.execute(),
Err(_) => return,
}
}
});
}
Arc::new(Self { tx })
}
pub async fn write(&self, f: impl FnOnce() -> Result<(), Error> + Send + 'static)
-> Result<(), Error>
{
let (reply_tx, reply_rx) = oneshot::channel();
self.tx.send(WriteTask { f: Box::new(f), reply: reply_tx }).ok();
reply_rx.await.unwrap()
}
}收益来源:不阻塞 tokio worker,消除调度等待。RocksDB 写入本身的速度没有变化。
storage engine 路径:这里才是 io_uring 的主场
为什么 storage engine 不同
storage engine 的核心路径是顺序写文件:消息写入 segment 文件,每次追加数据到文件末尾。这里我们完全控制文件 fd,可以直接使用 io_uring 的文件 I/O 接口。
tokio 对文件 I/O 的处理方式:
tokio::fs::write→ 内部用spawn_blocking→ 阻塞线程池执行同步write()- 本质是:用线程并发模拟 I/O 并发,每个并发写入占用一个线程
io_uring 的处理方式:
- 把操作描述(写哪个 fd、写什么数据、写到哪个偏移)提交到 submission queue
- 内核异步执行,完成后放入 completion queue
- 一个线程可以同时 pending 数百个 I/O 操作,不需要为每个操作分配一个线程
这个差异在顺序写场景下意义重大:多个 producer 同时向同一个 segment 文件追加消息时,io_uring 可以把多次 write 批量提交给内核,由内核调度合并,大幅减少系统调用次数和上下文切换。
compio + thread-per-core
storage engine 的存储线程使用 compio(io_uring executor)+ CPU 亲和性绑定:
pub struct FileWriteExecutor {
shards: Vec<std::sync::mpsc::SyncSender<FileTask>>,
}
impl FileWriteExecutor {
pub fn start(num_threads: usize) -> Arc<Self> {
let mut shards = Vec::with_capacity(num_threads);
for core_id in 0..num_threads {
let (tx, rx) = std::sync::mpsc::sync_channel::<FileTask>(8192);
shards.push(tx);
std::thread::Builder::new()
.name(format!("storage-io-{}", core_id))
.spawn(move || {
// 绑定到固定 CPU core,保持 page cache 和 TLB 热度
core_affinity::set_for_current(CoreId { id: core_id }).ok();
// compio runtime = io_uring event loop
compio::runtime::Runtime::new()
.unwrap()
.block_on(file_write_thread_main(rx));
})
.unwrap();
}
Arc::new(Self { shards })
}
/// partition_id 决定落到哪个 shard,同一 partition 的写入有序
pub async fn append(
&self,
partition_id: u64,
path: PathBuf,
data: Bytes,
) -> Result<u64, IoError> {
let shard = (partition_id as usize) % self.shards.len();
let (reply_tx, reply_rx) = oneshot::channel();
self.shards[shard]
.send(FileTask::Append { path, data, reply: reply_tx })
.map_err(|_| IoError::ExecutorShutdown)?;
reply_rx.await.map_err(|_| IoError::ReplyDropped)?
}
}
async fn file_write_thread_main(rx: std::sync::mpsc::Receiver<FileTask>) {
loop {
// 在 compio context 内接收任务
let task = compio::runtime::spawn_blocking(move || rx.recv())
.await
.unwrap();
match task {
Ok(FileTask::Append { path, data, reply }) => {
// io_uring 异步文件追加
// 内核持有 buffer 直到写入完成,应用层 await completion
let result = async {
let file = compio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?;
let (_, offset) = file.write_all_at(data, u64::MAX).await?;
Ok::<u64, std::io::Error>(offset)
}
.await
.map_err(IoError::Io);
let _ = reply.send(result);
}
Err(_) => return,
}
}
}收益来源:真正的 completion-based 异步 I/O + CPU 亲和性减少 cache 失效 + 批量 I/O 提交减少系统调用。
为什么混合架构有局限性
理解这个方案的局限和 Iggy 全量迁移的差距,和理解它能解决什么同样重要。
局限一:解决的是症状,不是根因
真正的根因是 tokio worker 线程被高并发连接耗尽。混合架构让存储路径不再占用 tokio worker,但 apply 的业务逻辑(decode、cache 更新等其他 await 点)仍在 tokio 上运行。随着连接数继续增长,其他 await 点同样会出现调度等待。这是在给 tokio 减负,而不是从架构上消除不确定性。
局限二:两个 runtime 共存,资源调度复杂
进程里同时跑 tokio 和 compio,CPU core 的分配需要仔细规划——io_uring 线程绑定了某些 core,tokio worker 线程也会调度到这些 core,产生竞争。线程总数增加(tokio worker + io_uring 线程 + RocksDB compaction 线程),在某些情况下可能反而因为线程竞争变差。
局限三:tokio/io_uring 边界有开销
每次 tokio → 存储线程的通信:mpsc::send + await oneshot + 线程唤醒。这个开销在 1~5μs 量级,单次不显著,但高频写入路径下会累积。Iggy 全量迁移后这个边界完全不存在。
局限四:io_uring 对 RocksDB 无效
如前所述,io_uring 对 RocksDB 路径没有直接收益。meta-service 的改善来自"不阻塞 tokio worker",而不是 I/O 模型升级。Iggy 全量换到 compio 之后,所有路径(包括网络 I/O)都受益于 io_uring + thread-per-core,我们的方案只有文件写入路径能得到完整的 io_uring 收益。
与 Iggy 全量迁移的对比
| 维度 | Iggy(全量迁移) | RobustMQ(混合架构) |
|---|---|---|
| 运行时 | tokio → compio(全替换) | tokio 保留,存储层剥离 |
| RocksDB 路径改善 | 同步调用,不阻塞 tokio worker | 同步调用,不阻塞 tokio worker(相同) |
| 文件 I/O 改善 | 全链路 io_uring,真正异步 | 存储线程内 io_uring,部分受益 |
| 网络路径改善 | 全链路 io_uring + thread-per-core | 无(tokio 不动) |
| 调度抖动消除 | 全链路消除 | 存储路径消除,业务逻辑路径仍有 |
| 迁移成本 | 极高(~2 年) | 低(按路径渐进) |
| 生态兼容 | 需要重建 | tonic、tracing 完全保留 |
| 工程风险 | 高 | 低 |
简单说:Iggy 的方案理论上限更高,混合架构工程成本更低、风险更小。选哪个取决于系统规模和团队资源。对于当前的 RobustMQ,混合架构是更务实的路径。
预期效果
基于对问题的分析(不是 Iggy 数据的直接套用,因为场景不完全相同):
meta-service(spawn_blocking 改造):
set_last_applied的调度等待从 5~11ms 降至接近 0session_process从 30~51ms 有明显下降(减去 raft apply 的调度等待部分)- gRPC 往返和 raft 共识本身的延迟不变
storage engine(io_uring 改造):
- 高并发写入下的 P99/P999 延迟分布更窄
- 顺序写吞吐有所提升,得益于批量 I/O 提交
- 具体幅度需要压测验证
下一步
方案还在设计阶段,尚未完整实现。计划的推进顺序:
- 最小验证:用
spawn_blocking改造set_last_applied,和现有实现做 A/B 对比,验证调度等待确实消除 - meta-service 改造:完成 raft 存储路径(
set_last_applied+append_entries+ snapshot) - storage engine 改造:segment 文件写入切换到 compio + io_uring
- 压测验证:1000/5000/10000 并发连接下的端到端延迟对比,看尾延迟分布
第一步成本极低,可以快速得到数据。这条路有明确的问题定位(我们自己的日志)和可控的迁移范围,我们会持续记录进展。
