RobustMQ MQTT 性能调优:从 DashMap 嵌套锁到 ArcSwap
近期,RobustMQ 的基础架构和组件、MQTT相关的核心功能基本都已开发完成。正在进行性能调优和稳定性优化。这篇文章记录 RobustMQ MQTT Broker 在高并发连接场景下的两次性能问题定位与修复:一次是嵌套 DashMap 引发的写锁竞争,另一次是 tokio::sync::RwLock 在高并发读场景下的隐性排队。两个问题表面看起来很不一样,根因却一致——不合适的锁原语用在了高并发路径上。
背景
RobustMQ 的 MQTT Broker 在处理客户端 CONNECT 请求时,需要完成多个步骤:连接限制检查、认证、构建连接对象、session 处理、写入本地缓存等。这些步骤全部在 connect() 函数中串行完成,每个 CONNECT 请求对应一个异步任务。
压测场景是使用 mqttx bench conn -c 1000 模拟 1000 个客户端同时发起 CONNECT。第一轮压测通常通过,第二轮开始出现连接超时,大量客户端连接失败。
问题一:嵌套 DashMap 的写锁死局
现象
加入诊断日志后,日志显示大量请求卡在 add_connection 这一步。进入这步的连接 ID 打印出来,但后续步骤的日志迟迟不出现,说明 add_connection 内部在等待某个锁。
根因分析
当时 MQTTCacheManager 中连接信息和 session 信息的数据结构是嵌套 DashMap:
// 改前
pub connection_info: DashMap<String, DashMap<u64, MQTTConnection>>,
// 外层 key: tenant,内层 key: connect_id
pub session_info: DashMap<String, DashMap<String, MqttSession>>,
// 外层 key: tenant,内层 key: client_id写入时的逻辑是:
pub fn add_connection(&self, connect_id: u64, conn: MQTTConnection) {
self.connection_info
.entry(conn.tenant.clone())
.or_default() // 持有外层 shard 写锁
.insert(connect_id, conn); // 在写锁持有期间操作内层 map
}问题出在 entry().or_default() 这个组合上。DashMap 内部按 key 的 hash 值分片(shard),每个 shard 有一把读写锁。entry() 会锁定 key 所在的 shard,返回 RefMut 持有该 shard 的写锁,直到 RefMut 被 drop 才释放。
压测时 1000 个连接使用同一个 tenant("default"),所有连接的外层 key 都是 "default",会 hash 到同一个 shard。结果:
- 1000 个并发任务同时调用
add_connection - 都在争同一个 shard 的写锁
- 每次写锁持有时间 = 外层
entry()+ 内层insert(),时间较长 - 1000 个任务退化为串行排队
这不是 DashMap 的 bug,是使用方式的问题。DashMap 的分片锁在 key 分散时效果极好,但当所有请求集中在同一个 key 时,等同于一把全局写锁。
此外还有一个更隐蔽的死锁风险:iter() 遍历外层 map 时持有 shard 读锁,如果在持有读锁期间试图 get_mut() 内层 map 的某个 entry,而同时另一个任务在 entry() 上等写锁,就会发生锁的循环等待。
解决方案
将嵌套 DashMap 扁平化,用复合 key 替代外层 map:
// 改后
// key: connect_id(全局唯一 u64)
pub connection_info: DashMap<u64, MQTTConnection>,
pub tenant_connection_index: DashMap<String, DashSet<u64>>,
// key: client_id(全局唯一,但不同 tenant 可能同名,用 "{tenant}/{client_id}")
pub session_info: DashMap<String, MqttSession>,
pub tenant_session_index: DashMap<String, DashSet<String>>,add_connection 变成两次独立的直接插入:
pub fn add_connection(&self, connect_id: u64, conn: MQTTConnection) {
self.tenant_connection_index
.entry(conn.tenant.clone())
.or_default()
.insert(connect_id); // DashSet 的写锁只持有极短时间
self.connection_info.insert(connect_id, conn); // O(1) 直接插入
}关键变化:connection_info 的 key 是 connect_id(u64),1000 个连接有 1000 个不同的 key,均匀散布在所有 shard 上。每个 shard 的竞争概率从 100% 降低到约 1/shard_count(DashMap 默认 16 或 32 个 shard)。写锁持有时间也极短——只是 HashMap 的一次 insert,而不是 insert + 内层 map 操作。
tenant_connection_index 虽然还是按 tenant 分组,但 DashSet<u64> 的写操作极快,且只在少数几个 tenant 上存在竞争,不会成为瓶颈。
同样的改动同步应用到 broker-core 的 NodeCacheManager 中的 session_list 和 topic_list:
// 改前
pub topic_list: DashMap<String, DashMap<String, Topic>>, // (tenant, (topic_name, Topic))
pub session_list: DashMap<String, DashMap<String, MqttSession>>, // (tenant, (client_id, Session))
// 改后
pub topic_list: DashMap<String, Topic>, // key: "{tenant}/{topic_name}"
pub topic_tenant_index: DashMap<String, DashSet<String>>,
pub session_list: DashMap<String, MqttSession>, // key: "{tenant}/{client_id}"
pub session_tenant_index: DashMap<String, DashSet<String>>,效果
修复前,诊断日志显示大量请求卡在 add_connection,后续步骤迟迟不输出:
[connect] add_session start connect_id=1023
[connect] add_connection start connect_id=1023
# ← 长时间无输出,st_report start 迟迟不出现修复后 mqttx bench conn -c 1000 连续多轮压测均无超时,add_connection 不再是瓶颈。
问题二:tokio RwLock 在高并发读下的排队效应
现象
在第一个问题修复后,仍然有部分 CONNECT 请求耗时超过 30ms。在 connect() 函数的各个步骤加上计时后,日志输出类似:
WARN [connect] slow connect_id=32588 total=46.24ms
get_cluster=18.05ms check_limit=0.24ms build_conn=0.10ms
session_process=27.52ms ...
WARN [connect] slow connect_id=29773 total=30.11ms
get_cluster=2.17ms check_limit=1.36ms build_conn=0.14ms
login_check=2.22ms session_process=23.87ms ...有时是 get_cluster 慢,有时是 check_limit 慢,有时是 build_conn 慢。慢的步骤不固定,但这些步骤在正常情况下都不应该慢——它们只是读一个配置对象,没有任何 I/O。
另一个规律是:慢的请求往往成批出现,同一秒内多个 connect_id 集体慢,而不是随机分散。
根因分析
get_cluster_config、check_limit(内部调用 get_cluster_config)、build_conn(内部调用 get_cluster_config)背后访问的是同一个数据结构:
pub cluster_config: Arc<RwLock<BrokerConfig>>,每次读取需要 .await 获取读锁:
pub async fn get_cluster_config(&self) -> BrokerConfig {
self.cluster_config.read().await.clone()
}tokio::sync::RwLock 是公平锁,写锁优先。这意味着:
- 正常情况下多个读者并发持有读锁,互不阻塞
- 一旦有写者等待(哪怕只是偶发的集群配置更新),所有新的读请求都会排队等待写锁先完成
- 写锁释放后,积压的读请求集中涌入,每个读请求都需要经过 tokio 的异步调度(yield + wake),即使读操作本身只需要纳秒级时间
压测时 1000 个并发连接,每个 connect() 调用 get_cluster_config 3~4 次,同一时刻可能有数百个读请求在异步调度队列中等待。任何一次集群配置的写操作——心跳同步、节点注册、动态配置下发——都会触发一次大规模的读请求排队。
慢的步骤不固定,正是因为这些步骤都调用了 get_cluster_config,被随机阻塞在哪一步就显示哪一步慢。
对比三种锁方案:
| 方案 | 读操作 | 写操作 | 适用场景 |
|---|---|---|---|
tokio::sync::RwLock<T> | async,读者会 yield,高并发下有调度开销;写优先策略导致读排队 | async | 读写频率相近 |
std::sync::RwLock<T> | 同步,不 yield,但会阻塞线程 | 同步,阻塞线程 | 持锁时间极短且无 async 代码 |
ArcSwap<T> | 无锁,原子指针读,无排队,无调度开销 | 原子替换整个指针 | 读极多写极少,整体替换 |
BrokerConfig 的使用场景是典型的"读极多、写极少、每次写都是整体替换":集群配置在启动时加载,偶尔通过管理接口更新,每次更新是整体 replace 而非局部修改。ArcSwap 与这个场景完美契合。
解决方案
将 cluster_config 从 Arc<RwLock<BrokerConfig>> 替换为 ArcSwap<BrokerConfig>:
// 改前
pub cluster_config: Arc<RwLock<BrokerConfig>>,
// 改后
pub cluster_config: ArcSwap<BrokerConfig>,读写操作从异步变为同步:
// 改前
pub async fn get_cluster_config(&self) -> BrokerConfig {
self.cluster_config.read().await.clone()
}
pub async fn set_cluster_config(&self, config: BrokerConfig) {
let mut data = self.cluster_config.write().await;
*data = config;
}
// 改后
pub fn get_cluster_config(&self) -> BrokerConfig {
self.cluster_config.load().as_ref().clone()
}
pub fn set_cluster_config(&self, config: BrokerConfig) {
self.cluster_config.store(Arc::new(config));
}ArcSwap::load() 是一次原子指针读(底层是 SeqCst load),没有锁,没有 yield,没有调度开销,1000 个并发读之间完全无竞争。store() 是原子指针替换,也不需要等待读者退出,写操作不会阻塞任何读操作。
动态配置更新(update_cluster_dynamic_config)改为 load → clone → 修改字段 → store 的模式:
pub fn update_cluster_dynamic_config(
node_cache: &Arc<NodeCacheManager>,
resource_type: ClusterDynamicConfig,
config: Bytes,
) -> Result<(), CommonError> {
let mut new_config = node_cache.get_cluster_config(); // 原子读,clone
match resource_type {
ClusterDynamicConfig::MqttProtocol => {
new_config.mqtt_protocol = serde_json::from_slice(&config)?;
}
// ...
}
node_cache.set_cluster_config(new_config); // 原子替换
Ok(())
}这个模式有一个理论上的 ABA 问题(两次并发写可能互相覆盖),但对于集群配置这种极低频写的场景,这个风险完全可以接受。
效果
修复前,压测日志中偶发出现整批请求的 get_cluster、check_limit、build_conn 步骤耗时突增至 10-20ms:
WARN [connect] slow connect_id=32588 total=46.24ms
get_cluster=18.05ms check_limit=0.24ms build_conn=0.10ms
session_process=27.52ms ...
WARN [connect] slow connect_id=29773 total=30.11ms
get_cluster=2.17ms check_limit=1.36ms build_conn=0.14ms
login_check=2.22ms session_process=23.87ms ...
WARN [connect] slow connect_id=53873 total=39.59ms
get_cluster=0.07ms check_limit=31.03ms build_conn=0.11ms
session_process=8.25ms ...修复后,这三个步骤的耗时全部稳定在 0.00-0.10ms,偶发的 10-20ms 完全消失:
WARN [connect] slow connect_id=10211 total=39.21ms
get_cluster=0.00ms check_limit=0.05ms build_conn=0.01ms
session_process=39.07ms ...
WARN [connect] slow connect_id=38411 total=41.74ms
get_cluster=0.00ms check_limit=0.05ms build_conn=0.01ms
session_process=41.59ms ...get_cluster_config 调用全部去掉 .await,相关函数签名从 async fn 变为普通 fn,减少了不必要的异步调度点。
两个问题的共同本质
这两个问题表面不同,但可以用同一句话概括:在高并发的热路径上使用了错误的并发原语。
DashMap 嵌套的问题是:把"按 tenant 分组"和"按 connect_id 索引"这两个概念混在了一个数据结构里,导致高并发下所有请求集中在同一个锁上。解法是把数据结构扁平化,让锁的粒度和业务访问的实际粒度对齐。
RwLock 的问题是:把一个极少变更的全局配置放在了有异步等待语义的锁里,在高并发读场景下引入了不必要的调度开销。解法是根据访问模式选择匹配的原语——读极多写极少、整体替换,用 ArcSwap。
定位这两个问题的关键,是在热路径上加了精细的计时日志,把每个步骤的耗时分开记录。在没有数据之前,猜测瓶颈在哪里往往是错的。有了数据之后,根因反而非常清晰。
性能优化没有什么神秘的。测量、定位、理解根因、选择合适的工具,然后验证。反复迭代。
附:与 AI 协作的过程
这两个问题是在和 AI 协作的过程中定位和修复的。值得记录一下这个协作模式本身。
AI 做不到的事
定位阶段,AI 基本帮不上忙。
最初尝试让 AI 直接分析代码,判断性能瓶颈在哪里。结果是 AI 会给出一堆猜测,方向几乎全错——因为性能问题是运行时行为,不是静态代码分析能发现的。AI 看不见压测时 1000 个连接全落在同一个 shard 上,也看不见 tokio RwLock 写优先策略在高并发下的排队效应。
感知异常、提出假设、设计实验,这些只能靠人。
第一个问题的突破口是:日志显示大量请求卡在 add_connection,进入这步的 connect_id 打印出来了,但后续步骤迟迟不出现。这个异常是人发现的,不是 AI 发现的。
第二个问题更隐蔽。修完第一个问题后,仍然有请求偶发超时。在 connect() 的各个步骤加了计时之后,日志里出现了一个奇怪的规律:get_cluster、check_limit、build_conn 这三步偶尔一起慢,而且是成批出现。这三步正常情况下只是读一个内存对象,不应该慢。"这几步不该慢,但它们一起慢"——这个观察是人做出的。
AI 擅长的事
定位到问题之后,AI 的价值才真正体现出来。
知识检索:描述了 tokio::sync::RwLock 在写优先策略下的行为,以及"读操作本身只需纳秒但高并发下仍然慢"这个症状,AI 立刻给出了 ArcSwap 这个工具,并解释了为什么它在"读极多写极少、整体替换"的场景下是最优选择。ArcSwap 在此之前并不熟悉,是 AI 推荐的。
执行放大:确定了方向之后,把 19 个文件里涉及 get_cluster_config().await 的调用全部改成同步,同时更新类型定义、调整所有调用方、修改 Cargo.toml 依赖——这些批量的、机械的变更,AI 可以快速准确地完成,不需要逐个文件手工搜索。
这个协作模式的本质
不是"把问题甩给 AI,让 AI 解决",而是:
人负责感知异常、提出假设、设计实验;AI 负责把人的思路快速变成代码和知识。
人是整个 debug 流程的主导者,AI 是执行放大器。这个分工在性能问题上尤其清晰——没有人发现"这三步不该慢但它们一起慢",AI 永远找不到 RwLock 的问题。
有了明确的方向之后,AI 的效率远超手工。但方向本身,只能靠人来给。
