RobustMQ's Hybrid Architecture Exploration: tokio + Storage Layer Extraction
Background
In the previous article (Tokio Scheduling Jitter and io_uring: From Iggy's Exploration to RobustMQ's Thinking), we analyzed the inherent jitter problem of the tokio work-stealing scheduler on high-concurrency storage paths, and reviewed Iggy's full migration from tokio to compio + io_uring, which took nearly two years.
Iggy's choice was aggressive: replace the entire system's runtime. That approach is correct, but the cost is enormous.
Our thinking is different: instead of replacing tokio, extract the storage-intensive path out of tokio's scheduling contention.
But this approach has its own boundaries and costs, which need to be explained clearly.
Problem Review
Under a stress test with 1000 concurrent MQTT connections, the meta-service's raft apply path showed significant jitter:
WARN meta_service::raft::store::state: apply batch slow: total=11.07ms route=0.07ms set_last_applied=11.00ms
WARN meta_service::raft::store::state: apply batch slow: total=8.76ms route=0.05ms set_last_applied=8.71ms
WARN meta_service::raft::store::state: apply batch slow: total=10.10ms route=0.07ms set_last_applied=10.02msset_last_applied does only one thing: write a single record to RocksDB. RocksDB monitoring shows the actual write latency is < 1ms.
The extra 5~10ms is tokio scheduling wait time — the task re-queues after await and waits for a worker thread to become available.
At the same time, on the MQTT Broker side for connection handling:
WARN mqtt_broker::mqtt::connect: slow total=51.11ms session_process=50.91ms
WARN mqtt_broker::mqtt::connect: slow total=31.47ms session_process=31.32mssession_process = gRPC round-trip + raft consensus + apply execution. The scheduling jitter inside raft apply is amplified through the gRPC chain into 30~51ms connection latency.
Root cause: tokio worker threads are exhausted by high-concurrency MQTT connections, causing apply tasks to queue up waiting for scheduling.
Why Not a Full Migration
Problems with fully replacing the runtime (tokio → compio):
- Extremely high migration cost: the network layer, gRPC services, and all async code are deeply tied to the tokio ecosystem; Iggy spent nearly two years on this
- Ecosystem fragmentation: tonic, hyper, tracing, etc. cannot be used directly on compio — the entire tech stack needs to be rebuilt
- Uncontrollable risk: system stability is hard to guarantee during a full migration
More fundamentally: RobustMQ's network layer and business logic layer themselves are not the problem. tokio is excellent at high-concurrency network I/O — MQTT connection management, protocol parsing, subscription matching, and message routing are all tokio's strengths. The jitter is concentrated in the storage write path.
Use a scalpel, not a bulldozer.
Hybrid Architecture Approach
Overall Structure
┌──────────────────────────────────────────────────────────────────┐
│ tokio runtime (main runtime) │
│ │
│ MQTT Broker meta-service │
│ ───────────── ───────────────────── │
│ Network connection mgmt gRPC server │
│ MQTT protocol parsing raft consensus logic ─── channel ──►│
│ Subscription routing Business cache updates │
│ Message push (tokio task) ◄── oneshot ───│
│ │
└──────────────────────────────────────────────────────────────────┘
│
┌────────────────────────────────────────────────────────┘
│ Dedicated storage thread pool
│
│ meta-service path: std::thread + spawn_blocking
│ thread-0 thread-1 thread-2 ...
│ │ RocksDB write │ RocksDB write │ ...
│ │ (synchronous call, but does not occupy tokio worker)
│
│ storage engine path: compio + io_uring
│ thread-0 (core 0) thread-1 (core 1) ...
│ ┌────────────────┐ ┌────────────────┐
│ │ compio runtime │ │ compio runtime │
│ │ io_uring loop │ │ io_uring loop │
│ │ file.write() │ │ file.write() │ ← true completion-based I/O
│ └────────────────┘ └────────────────┘
└──────────────────────────────────────────────────────Both parts run in the same process and communicate via channels:
- The tokio side sends write requests and uses
oneshot::channelto wait for results - Storage threads complete the operation and reply via oneshot
- No shared state, no locks
Key Distinction: Two Storage Paths, Different Solutions
An important distinction must be made clear here: meta-service (RocksDB) and the storage engine (file writes) call for different approaches, and the sources of benefit are completely different.
meta-service Path: spawn_blocking Is Enough
The Problem
The jitter in set_last_applied comes from tokio scheduling wait, not from slow RocksDB. Evidence: RocksDB monitoring shows < 1ms, but timing shows 5~11ms.
io_uring has almost no role to play here.
The reason: RocksDB is a C++ library called via rocksdb-rs, and internally uses traditional pwrite/fsync system calls. We cannot replace its internal I/O with io_uring from the outside (unless we deeply modify RocksDB through its Env interface to customize the I/O backend, which is extremely complex).
When calling db.put_cf(), regardless of what executor is wrapped around it, the internal write is synchronous.
What's Actually Needed: Move RocksDB Calls Off the tokio Worker Thread
Using tokio::task::spawn_blocking solves 80% of the problem:
// Before: synchronous RocksDB call on tokio worker thread
// Under high concurrency, workers are saturated and apply tasks queue up
async fn set_last_applied(&self, log_id: LogId) -> Result<(), StorageError> {
self.db.put_cf(&cf, KEY_LAST_APPLIED, encode(&log_id)?)?;
Ok(())
}
// After: move RocksDB call to the blocking thread pool
// tokio worker is released immediately, no longer competing for scheduling
async fn set_last_applied(&self, log_id: LogId) -> Result<(), StorageError> {
let db = self.db.clone();
let value = encode(&log_id)?;
tokio::task::spawn_blocking(move || {
db.put_cf(&cf, KEY_LAST_APPLIED, &value)
})
.await??;
Ok(())
}spawn_blocking hands the call off to tokio's blocking thread pool (default limit 512 threads), the current tokio worker is immediately free to handle other tasks, eliminating scheduling backlog.
If you want more precise control (e.g., a fixed number of threads, CPU affinity, or avoiding exhaustion of tokio's blocking thread pool by other callers), you can build your own thread pool:
// Custom fixed-size RocksDB write thread pool
pub struct RocksDBWritePool {
tx: std::sync::mpsc::SyncSender<WriteTask>,
}
impl RocksDBWritePool {
pub fn start(num_threads: usize) -> Arc<Self> {
let (tx, rx) = std::sync::mpsc::sync_channel(8192);
let rx = Arc::new(std::sync::Mutex::new(rx));
for _ in 0..num_threads {
let rx = rx.clone();
std::thread::spawn(move || {
loop {
match rx.lock().unwrap().recv() {
Ok(task) => task.execute(),
Err(_) => return,
}
}
});
}
Arc::new(Self { tx })
}
pub async fn write(&self, f: impl FnOnce() -> Result<(), Error> + Send + 'static)
-> Result<(), Error>
{
let (reply_tx, reply_rx) = oneshot::channel();
self.tx.send(WriteTask { f: Box::new(f), reply: reply_tx }).ok();
reply_rx.await.unwrap()
}
}Source of benefit: no longer blocking tokio workers, eliminating scheduling wait. The speed of RocksDB writes themselves does not change.
storage engine Path: This Is Where io_uring Shines
Why the storage engine Is Different
The core path of the storage engine is sequential file writes: messages are written to segment files, each time appending data to the end. Here we have full control over the file fd, and can use io_uring's file I/O interface directly.
How tokio handles file I/O:
tokio::fs::write→ internally usesspawn_blocking→ blocking thread pool executes synchronouswrite()- In essence: using thread concurrency to simulate I/O concurrency, each concurrent write occupies one thread
How io_uring handles it:
- Submit an operation descriptor (which fd to write, what data, at which offset) to the submission queue
- The kernel executes asynchronously, placing results in the completion queue
- A single thread can have hundreds of I/O operations pending simultaneously, without needing to allocate a thread per operation
This difference is highly significant in sequential write scenarios: when multiple producers simultaneously append messages to the same segment file, io_uring can batch-submit multiple write operations to the kernel, which schedules and merges them, dramatically reducing system call counts and context switches.
compio + thread-per-core
Storage engine threads use compio (io_uring executor) + CPU affinity binding:
pub struct FileWriteExecutor {
shards: Vec<std::sync::mpsc::SyncSender<FileTask>>,
}
impl FileWriteExecutor {
pub fn start(num_threads: usize) -> Arc<Self> {
let mut shards = Vec::with_capacity(num_threads);
for core_id in 0..num_threads {
let (tx, rx) = std::sync::mpsc::sync_channel::<FileTask>(8192);
shards.push(tx);
std::thread::Builder::new()
.name(format!("storage-io-{}", core_id))
.spawn(move || {
// Pin to a fixed CPU core to keep page cache and TLB warm
core_affinity::set_for_current(CoreId { id: core_id }).ok();
// compio runtime = io_uring event loop
compio::runtime::Runtime::new()
.unwrap()
.block_on(file_write_thread_main(rx));
})
.unwrap();
}
Arc::new(Self { shards })
}
/// partition_id determines which shard to use; writes to the same partition are ordered
pub async fn append(
&self,
partition_id: u64,
path: PathBuf,
data: Bytes,
) -> Result<u64, IoError> {
let shard = (partition_id as usize) % self.shards.len();
let (reply_tx, reply_rx) = oneshot::channel();
self.shards[shard]
.send(FileTask::Append { path, data, reply: reply_tx })
.map_err(|_| IoError::ExecutorShutdown)?;
reply_rx.await.map_err(|_| IoError::ReplyDropped)?
}
}
async fn file_write_thread_main(rx: std::sync::mpsc::Receiver<FileTask>) {
loop {
// Receive tasks within the compio context
let task = compio::runtime::spawn_blocking(move || rx.recv())
.await
.unwrap();
match task {
Ok(FileTask::Append { path, data, reply }) => {
// io_uring asynchronous file append
// The kernel holds the buffer until the write completes; application awaits completion
let result = async {
let file = compio::fs::OpenOptions::new()
.append(true)
.open(&path)
.await?;
let (_, offset) = file.write_all_at(data, u64::MAX).await?;
Ok::<u64, std::io::Error>(offset)
}
.await
.map_err(IoError::Io);
let _ = reply.send(result);
}
Err(_) => return,
}
}
}Source of benefit: true completion-based async I/O + CPU affinity reduces cache misses + batched I/O submission reduces system calls.
Why the Hybrid Architecture Has Limitations
Understanding the limitations of this approach and the gap compared to Iggy's full migration is just as important as understanding what it can solve.
Limitation 1: Treating Symptoms, Not the Root Cause
The true root cause is that tokio worker threads are exhausted by high-concurrency connections. The hybrid architecture prevents storage paths from occupying tokio workers, but the business logic inside apply (decode, cache updates, and other await points) still runs on tokio. As the connection count continues to grow, other await points will also experience scheduling wait. This is relieving tokio's burden, not architecturally eliminating non-determinism.
Limitation 2: Two Runtimes Coexisting — Complex Resource Scheduling
Running tokio and compio simultaneously in the same process means CPU core allocation needs careful planning — io_uring threads are pinned to certain cores, but tokio worker threads are also scheduled onto those cores, creating contention. The total thread count increases (tokio workers + io_uring threads + RocksDB compaction threads), and in some cases this may actually worsen performance due to thread contention.
Limitation 3: Overhead at the tokio/io_uring Boundary
Each tokio → storage thread communication involves: mpsc::send + await oneshot + thread wakeup. This overhead is on the order of 1~5μs, which is not significant individually but accumulates on high-frequency write paths. After Iggy's full migration, this boundary does not exist at all.
Limitation 4: io_uring Has No Effect on RocksDB
As mentioned earlier, io_uring provides no direct benefit for the RocksDB path. The improvement in meta-service comes from "not blocking tokio workers," not from an upgraded I/O model. After Iggy's full migration to compio, all paths (including network I/O) benefit from io_uring + thread-per-core, whereas our approach only gets the full io_uring benefit on the file write path.
Comparison with Iggy's Full Migration
| Dimension | Iggy (full migration) | RobustMQ (hybrid architecture) |
|---|---|---|
| Runtime | tokio → compio (full replacement) | tokio retained, storage layer extracted |
| RocksDB path improvement | Synchronous call, does not block tokio worker | Synchronous call, does not block tokio worker (same) |
| File I/O improvement | Full-chain io_uring, truly async | io_uring within storage threads, partial benefit |
| Network path improvement | Full-chain io_uring + thread-per-core | None (tokio unchanged) |
| Scheduling jitter elimination | Full-chain elimination | Eliminated on storage path, still present in business logic |
| Migration cost | Very high (~2 years) | Low (incremental, path-by-path) |
| Ecosystem compatibility | Needs to be rebuilt | tonic, tracing fully preserved |
| Engineering risk | High | Low |
In short: Iggy's approach has a higher theoretical ceiling; the hybrid architecture has lower engineering cost and lower risk. Which to choose depends on the system's scale and team resources. For RobustMQ at its current stage, the hybrid architecture is the more pragmatic path.
Expected Outcomes
Based on our analysis of the problem (not a direct application of Iggy's data, since the scenarios are not identical):
meta-service (spawn_blocking refactor):
- Scheduling wait in
set_last_applieddrops from 5~11ms to near 0 session_processshows a notable decrease from 30~51ms (by eliminating the scheduling wait portion of raft apply)- Latency from gRPC round-trips and raft consensus itself remains unchanged
storage engine (io_uring refactor):
- P99/P999 latency distribution narrows under high-concurrency writes
- Sequential write throughput increases due to batched I/O submission
- Specific magnitude needs to be validated through stress testing
Next Steps
The plan is still in the design phase and not fully implemented. The planned sequence:
- Minimum validation: refactor
set_last_appliedwithspawn_blockingand do an A/B comparison against the current implementation to verify that scheduling wait is actually eliminated - meta-service refactor: complete the raft storage path (
set_last_applied+append_entries+ snapshot) - storage engine refactor: switch segment file writes to compio + io_uring
- Stress test validation: end-to-end latency comparison under 1000/5000/10000 concurrent connections, examining tail latency distribution
The first step has very low cost and can quickly produce data. This path has a clear problem definition (our own logs) and a controllable migration scope — we will continue documenting progress.
