Skip to content

RobustMQ MQTT Performance Tuning: From DashMap Nested Lock Contention to ArcSwap

Recently, the core infrastructure, components, and MQTT-related core features of RobustMQ have been largely completed. We are now in the process of performance tuning and stability optimization. This article documents two performance problem diagnoses and fixes in the RobustMQ MQTT Broker under high-concurrency connection scenarios: one involving write lock contention caused by nested DashMaps, and another involving hidden queuing from tokio::sync::RwLock under high-concurrency reads. The two problems look very different on the surface, but their root cause is the same — the wrong concurrency primitive used on a high-concurrency path.

Background

When the RobustMQ MQTT Broker processes a client CONNECT request, it must complete multiple steps: connection limit check, authentication, building the connection object, session handling, writing to the local cache, and more. All of these steps are performed serially in the connect() function, with one async task per CONNECT request.

The benchmark scenario uses mqttx bench conn -c 1000 to simulate 1,000 clients initiating CONNECT simultaneously. The first round of benchmarking typically passes; starting from the second round, connection timeouts appear and large numbers of clients fail to connect.

Problem One: The Write Lock Deadlock of Nested DashMap

Symptom

After adding diagnostic logging, the logs showed a large number of requests stuck at the add_connection step. The connection IDs entering this step were printed, but the logs for subsequent steps never appeared, indicating that something inside add_connection was waiting on a lock.

Root Cause Analysis

At the time, the data structures for connection and session information in MQTTCacheManager were nested DashMaps:

rust
// Before
pub connection_info: DashMap<String, DashMap<u64, MQTTConnection>>,
// outer key: tenant, inner key: connect_id

pub session_info: DashMap<String, DashMap<String, MqttSession>>,
// outer key: tenant, inner key: client_id

The write logic was:

rust
pub fn add_connection(&self, connect_id: u64, conn: MQTTConnection) {
    self.connection_info
        .entry(conn.tenant.clone())
        .or_default()           // holds outer shard write lock
        .insert(connect_id, conn); // operates on inner map while holding write lock
}

The problem is the entry().or_default() combination. DashMap shards keys internally by hash value, with each shard holding a read/write lock. entry() locks the shard containing the key, returning a RefMut that holds the write lock on that shard until the RefMut is dropped.

In the benchmark, 1,000 connections all used the same tenant ("default"), so all connections had the outer key "default", which hashes to the same shard. The result:

  • 1,000 concurrent tasks all call add_connection
  • All competing for the write lock on the same shard
  • Each write lock hold time = outer entry() + inner insert(), which takes a relatively long time
  • 1,000 tasks degrade to serial queuing

This is not a DashMap bug — it's a misuse. DashMap's sharding works extremely well when keys are spread out, but when all requests concentrate on the same key, it's equivalent to a single global write lock.

There was also a more subtle deadlock risk: iterating over the outer map with iter() holds a shard read lock; if during that read lock hold you try to get_mut() an entry in the inner map, while another task is waiting for a write lock on entry(), a circular wait on locks will occur.

Solution

Flatten the nested DashMap, replacing the outer map with a composite key:

rust
// After
// key: connect_id (globally unique u64)
pub connection_info: DashMap<u64, MQTTConnection>,
pub tenant_connection_index: DashMap<String, DashSet<u64>>,

// key: client_id (globally unique, but different tenants may have the same name, use "{tenant}/{client_id}")
pub session_info: DashMap<String, MqttSession>,
pub tenant_session_index: DashMap<String, DashSet<String>>,

add_connection becomes two independent direct insertions:

rust
pub fn add_connection(&self, connect_id: u64, conn: MQTTConnection) {
    self.tenant_connection_index
        .entry(conn.tenant.clone())
        .or_default()
        .insert(connect_id);         // DashSet write lock held for an extremely short time
    self.connection_info.insert(connect_id, conn); // O(1) direct insert
}

The key change: the key of connection_info is connect_id (u64). With 1,000 connections there are 1,000 different keys, evenly distributed across all shards. The probability of contention on each shard drops from 100% to approximately 1/shard_count (DashMap defaults to 16 or 32 shards). Write lock hold time is also extremely short — just one HashMap insert, not an insert plus inner map operation.

tenant_connection_index still groups by tenant, but DashSet<u64> write operations are extremely fast, and contention only exists on a small number of tenants — it will not become a bottleneck.

The same change was applied to session_list and topic_list in NodeCacheManager in broker-core:

rust
// Before
pub topic_list: DashMap<String, DashMap<String, Topic>>,    // (tenant, (topic_name, Topic))
pub session_list: DashMap<String, DashMap<String, MqttSession>>, // (tenant, (client_id, Session))

// After
pub topic_list: DashMap<String, Topic>,                     // key: "{tenant}/{topic_name}"
pub topic_tenant_index: DashMap<String, DashSet<String>>,
pub session_list: DashMap<String, MqttSession>,             // key: "{tenant}/{client_id}"
pub session_tenant_index: DashMap<String, DashSet<String>>,

Effect

Before the fix, diagnostic logs showed large numbers of requests stuck at add_connection, with subsequent steps never outputting:

[connect] add_session start connect_id=1023
[connect] add_connection start connect_id=1023
# ← long silence, st_report start never appears

After the fix, mqttx bench conn -c 1000 runs through multiple consecutive rounds without timeouts; add_connection is no longer a bottleneck.


Problem Two: tokio RwLock Queuing Effect Under High-Concurrency Reads

Symptom

After the first problem was fixed, some CONNECT requests still took more than 30ms. After adding timing to each step in the connect() function, the logs showed output like this:

WARN [connect] slow connect_id=32588 total=46.24ms
    get_cluster=18.05ms check_limit=0.24ms build_conn=0.10ms
    session_process=27.52ms ...

WARN [connect] slow connect_id=29773 total=30.11ms
    get_cluster=2.17ms check_limit=1.36ms build_conn=0.14ms
    login_check=2.22ms session_process=23.87ms ...

Sometimes get_cluster was slow, sometimes check_limit, sometimes build_conn. The slow step was not consistent, yet under normal conditions none of these steps should be slow — they're just reading a configuration object with no I/O whatsoever.

Another pattern was that slow requests tended to appear in batches — multiple connect_ids slow at the same time within a single second, rather than randomly scattered.

Root Cause Analysis

get_cluster_config, check_limit (which internally calls get_cluster_config), and build_conn (which also internally calls get_cluster_config) all access the same data structure:

rust
pub cluster_config: Arc<RwLock<BrokerConfig>>,

Each read requires .await to acquire the read lock:

rust
pub async fn get_cluster_config(&self) -> BrokerConfig {
    self.cluster_config.read().await.clone()
}

tokio::sync::RwLock is a fair lock with writer preference. This means:

  1. Under normal conditions, multiple readers concurrently hold the read lock without blocking each other
  2. Once a writer is waiting (even from an occasional cluster config update), all new read requests queue up to wait for the write lock to finish first
  3. After the write lock is released, the accumulated read requests flood in all at once, and each read request needs to go through tokio's async scheduler (yield + wake) — even though the read operation itself only takes nanoseconds

During the benchmark, 1,000 concurrent connections each call get_cluster_config 3~4 times, so at any given moment hundreds of read requests may be waiting in the async scheduler. Any cluster config write operation — heartbeat sync, node registration, dynamic config push — triggers a large-scale batch queuing of read requests.

The fact that the slow step wasn't consistent is precisely because all these steps call get_cluster_config; whichever step happens to be blocked shows up as slow.

Comparing three locking approaches:

ApproachRead OperationWrite OperationSuitable Scenario
tokio::sync::RwLock<T>async, readers yield, scheduling overhead under high concurrency; writer-preference policy causes read queuingasyncRead and write frequencies are similar
std::sync::RwLock<T>synchronous, no yield, but blocks the threadsynchronous, blocks threadLock held for extremely short time with no async code
ArcSwap<T>lock-free, atomic pointer read, no queuing, no scheduling overheadatomically replaces the entire pointerReads vastly outnumber writes, whole-value replacement

The usage pattern of BrokerConfig is the classic "reads vastly outnumber writes, each write is a whole-value replacement": cluster config is loaded at startup and occasionally updated through the admin interface, with each update being a full replacement rather than a partial modification. ArcSwap is a perfect fit for this scenario.

Solution

Replace cluster_config from Arc<RwLock<BrokerConfig>> with ArcSwap<BrokerConfig>:

rust
// Before
pub cluster_config: Arc<RwLock<BrokerConfig>>,

// After
pub cluster_config: ArcSwap<BrokerConfig>,

Read and write operations change from async to synchronous:

rust
// Before
pub async fn get_cluster_config(&self) -> BrokerConfig {
    self.cluster_config.read().await.clone()
}
pub async fn set_cluster_config(&self, config: BrokerConfig) {
    let mut data = self.cluster_config.write().await;
    *data = config;
}

// After
pub fn get_cluster_config(&self) -> BrokerConfig {
    self.cluster_config.load().as_ref().clone()
}
pub fn set_cluster_config(&self, config: BrokerConfig) {
    self.cluster_config.store(Arc::new(config));
}

ArcSwap::load() is a single atomic pointer read (using a SeqCst load under the hood) — no lock, no yield, no scheduling overhead. 1,000 concurrent reads have absolutely no contention with each other. store() is an atomic pointer replacement that doesn't need to wait for readers to exit; writes don't block any reads.

Dynamic config updates (update_cluster_dynamic_config) are changed to a load → clone → modify fields → store pattern:

rust
pub fn update_cluster_dynamic_config(
    node_cache: &Arc<NodeCacheManager>,
    resource_type: ClusterDynamicConfig,
    config: Bytes,
) -> Result<(), CommonError> {
    let mut new_config = node_cache.get_cluster_config(); // atomic read, clone
    match resource_type {
        ClusterDynamicConfig::MqttProtocol => {
            new_config.mqtt_protocol = serde_json::from_slice(&config)?;
        }
        // ...
    }
    node_cache.set_cluster_config(new_config); // atomic replace
    Ok(())
}

This pattern has a theoretical ABA issue (two concurrent writes may overwrite each other), but for an extremely low-frequency write scenario like cluster configuration, this risk is completely acceptable.

Effect

Before the fix, benchmark logs occasionally showed entire batches of requests where get_cluster, check_limit, and build_conn steps suddenly spiked to 10-20ms:

log
WARN [connect] slow connect_id=32588 total=46.24ms
    get_cluster=18.05ms check_limit=0.24ms build_conn=0.10ms
    session_process=27.52ms ...

WARN [connect] slow connect_id=29773 total=30.11ms
    get_cluster=2.17ms check_limit=1.36ms build_conn=0.14ms
    login_check=2.22ms session_process=23.87ms ...

WARN [connect] slow connect_id=53873 total=39.59ms
    get_cluster=0.07ms check_limit=31.03ms build_conn=0.11ms
    session_process=8.25ms ...

After the fix, all three steps stabilized at 0.00-0.10ms, with the occasional 10-20ms spikes completely gone:

log
WARN [connect] slow connect_id=10211 total=39.21ms
    get_cluster=0.00ms check_limit=0.05ms build_conn=0.01ms
    session_process=39.07ms ...

WARN [connect] slow connect_id=38411 total=41.74ms
    get_cluster=0.00ms check_limit=0.05ms build_conn=0.01ms
    session_process=41.59ms ...

All get_cluster_config calls had their .await removed, the related function signatures changed from async fn to regular fn, reducing unnecessary async scheduling points.


The Common Essence of Both Problems

These two problems look different on the surface, but they can be summarized in a single sentence: the wrong concurrency primitive was used on a hot, high-concurrency path.

The nested DashMap problem was this: "grouping by tenant" and "indexing by connect_id" were mixed into a single data structure, causing all requests under high concurrency to concentrate on the same lock. The fix was to flatten the data structure so that the lock granularity aligns with the actual granularity of business access.

The RwLock problem was this: a rarely-changing global config was placed inside a lock with async-wait semantics, introducing unnecessary scheduling overhead in a high-concurrency read scenario. The fix was to choose a primitive that matches the access pattern — reads vastly outnumber writes, whole-value replacement means using ArcSwap.

The key to diagnosing both problems was adding fine-grained timing logs on the hot path, recording the duration of each step separately. Before having data, guessing where the bottleneck is tends to be wrong. With data, the root cause becomes very clear.

There's nothing mysterious about performance optimization. Measure, locate, understand the root cause, choose the right tool, then verify. Iterate.


Appendix: The Process of Collaborating with AI

Both problems were diagnosed and fixed through a collaborative process with AI. The collaboration pattern itself is worth recording.

What AI Cannot Do

During the diagnosis phase, AI was largely unable to help.

At first I tried asking AI to directly analyze the code and identify where the performance bottleneck was. The result was that AI produced a bunch of guesses that were almost entirely wrong — because performance problems are runtime behavior, not something that can be discovered through static code analysis. AI couldn't see that 1,000 connections all landing on the same shard during the benchmark, and it couldn't see the queuing effect of tokio RwLock's writer-preference policy under high concurrency.

Perceiving anomalies, forming hypotheses, designing experiments — these can only be done by humans.

The breakthrough for the first problem was: logs showed large numbers of requests stuck at add_connection; the connect_id values entering this step were printed, but the logs for subsequent steps never appeared. This anomaly was noticed by a human, not AI.

The second problem was more subtle. After fixing the first problem, some requests were still occasionally timing out. After adding timing to each step in connect(), a strange pattern appeared in the logs: get_cluster, check_limit, and build_conn occasionally all became slow at the same time, and in batches. Under normal conditions, these steps should only be reading an in-memory object — they shouldn't be slow. "These steps shouldn't be slow, but they're all slow together" — this observation was made by a human.

What AI Is Good At

Once the problem was located, AI's value truly showed.

Knowledge retrieval: After describing tokio::sync::RwLock's behavior under writer-preference policy, and the symptom of "the read operation itself only takes nanoseconds yet it's still slow under high concurrency," AI immediately suggested ArcSwap and explained why it is the optimal choice in a "reads vastly outnumber writes, whole-value replacement" scenario. ArcSwap was not something I was familiar with before — AI recommended it.

Execution amplification: Once the direction was clear, changing all 19 files that had get_cluster_config().await calls to synchronous — while simultaneously updating type definitions, adjusting all call sites, and modifying Cargo.toml dependencies — AI could do all these batch mechanical changes quickly and accurately, without needing to manually search file by file.

The Essence of This Collaboration Pattern

It's not "throw the problem at AI and let AI solve it," but rather:

Humans are responsible for perceiving anomalies, forming hypotheses, and designing experiments; AI is responsible for rapidly turning human thinking into code and knowledge.

The human is the driver of the entire debug process; AI is the execution amplifier. This division is especially clear for performance problems — without a human noticing "these steps shouldn't be slow but they're all slow together," AI would never find the RwLock issue.

Once a clear direction is established, AI's efficiency far surpasses doing things by hand. But the direction itself can only come from humans.

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀