Broker 配置说明
本文档描述 RobustMQ Broker 服务的所有配置项。日志配置请参考 Logging.md。
概述
RobustMQ 使用 TOML 格式的配置文件来管理系统配置。主配置文件为 config/server.toml。
配置加载优先级
- 环境变量(最高)
- 配置文件
- 默认值(最低)
环境变量覆盖
支持通过环境变量覆盖配置文件中的设置。命名规则:
ROBUST_MQ_SERVER_{SECTION}_{KEY}- 顶层配置项:
ROBUST_MQ_SERVER_{KEY} - Section 内配置项:
ROBUST_MQ_SERVER_{SECTION}_{KEY} - 所有字母大写,
.替换为_
示例:
export ROBUST_MQ_SERVER_CLUSTER_NAME="my-cluster"
export ROBUST_MQ_SERVER_MQTT_SERVER_TCP_PORT=1883
export ROBUST_MQ_SERVER_PROMETHEUS_PORT=90911. 基础配置
顶层配置项,定义集群和节点的基本信息。
cluster_name = "robust_mq_cluster_default"
broker_id = 1
broker_ip = "127.0.0.1"
roles = ["broker", "meta"]
grpc_port = 1228
http_port = 8080
[meta_addrs]
1 = "127.0.0.1:1228"| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
cluster_name | string | "robust_mq_cluster_default" | 集群名称,同一集群内所有节点必须一致 |
broker_id | u64 | 1 | 节点唯一标识 |
broker_ip | string | 自动获取本机 IP | 节点 IP 地址 |
roles | array | ["broker", "meta"] | 节点角色列表,可选值:meta、broker、engine |
grpc_port | u32 | 1228 | gRPC 服务端口 |
http_port | u32 | 8080 | HTTP API 服务端口 |
meta_addrs | table | {1 = "127.0.0.1:1228"} | Meta 节点地址映射,键为节点 ID,值为 IP:端口 |
部署模式
- 一体化部署:
roles = ["meta", "broker", "engine"] - 分离式部署:
- Meta 节点:
roles = ["meta"] - Broker 节点:
roles = ["broker"] - Engine 节点:
roles = ["engine"]
- Meta 节点:
2. 运行时配置
[runtime]
Tokio 运行时与 TLS 配置。RobustMQ 内部划分了三个独立的 Tokio 运行时,分别承担不同职责,可以独立调优。
[runtime]
tls_cert = "./config/certs/cert.pem"
tls_key = "./config/certs/key.pem"
# 各运行时工作线程数,0 = 自动(推荐)
# server_worker_threads = 0
# meta_worker_threads = 0
# broker_worker_threads = 0| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
tls_cert | string | "./config/certs/cert.pem" | TLS 证书文件路径 |
tls_key | string | "./config/certs/key.pem" | TLS 私钥文件路径 |
server_worker_threads | usize | 0(自动) | server-runtime 工作线程数,自动值 = max(4, CPU核数 / 2) |
meta_worker_threads | usize | 0(自动) | meta-runtime 工作线程数,自动值 = max(4, CPU核数 / 2) |
broker_worker_threads | usize | 0(自动) | broker-runtime 工作线程数,自动值 = CPU核数 |
三个运行时说明:
| 运行时 | 职责 | 默认线程数 |
|---|---|---|
server-runtime | gRPC 服务、HTTP Admin API、Prometheus 指标暴露 | max(4, CPU/2) |
meta-runtime | Raft 状态机、RocksDB 写入 | max(4, CPU/2) |
broker-runtime | MQTT 连接处理、消息投递热路径 | CPU核数 |
调优建议: 保持默认值
0即可。通过 Grafana 的tokio_runtime_busy_ratio指标判断是否需要调整:某个运行时繁忙比持续 > 80% 时,可适当增加其线程数。
4. Meta 运行时配置
[meta_runtime]
元数据服务心跳与 Raft 配置。
[meta_runtime]
heartbeat_timeout_ms = 30000
heartbeat_check_time_ms = 1000
raft_write_timeout_sec = 30
offset_raft_group_num = 1
data_raft_group_num = 1| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
heartbeat_timeout_ms | u64 | 30000 | 节点心跳超时时间(毫秒),超时后标记节点不可用 |
heartbeat_check_time_ms | u64 | 1000 | 心跳检查间隔(毫秒) |
raft_write_timeout_sec | u64 | 30 | Raft 写操作超时时间(秒) |
offset_raft_group_num | u32 | 1 | Offset Raft 分组数量 |
data_raft_group_num | u32 | 1 | 数据 Raft 分组数量 |
5. RocksDB 配置
[rocksdb]
本地 RocksDB 存储配置。
[rocksdb]
data_path = "./data"
max_open_files = 10000| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
data_path | string | "./data" | RocksDB 数据存储目录 |
max_open_files | i32 | 10000 | 最大同时打开文件数 |
6. 存储引擎运行时配置
[storage_runtime]
Journal 存储引擎运行时配置。
[storage_runtime]
tcp_port = 1778
max_segment_size = 1073741824
io_thread_num = 8
data_path = []
[storage_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
tcp_port | u32 | 1778 | 存储引擎 TCP 端口 |
max_segment_size | u32 | 1073741824 (1 GB) | 单个 Segment 文件最大大小(字节) |
io_thread_num | u32 | 8 | IO 处理线程数 |
data_path | array | [] | 数据存储路径列表 |
[storage_runtime.network] 网络线程配置:
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
accept_thread_num | usize | 2 | 接受连接的线程数 |
handler_thread_num | usize | 16 | 请求处理线程数 |
response_thread_num | usize | 4 | 响应线程数 |
queue_size | usize | 1000 | 内部处理队列大小 |
6a. Kafka 运行时配置
[kafka_runtime]
Kafka 协议服务网络线程配置。
[kafka_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
accept_thread_num | usize | 2 | 接受连接的线程数 |
handler_thread_num | usize | 16 | 请求处理线程数 |
response_thread_num | usize | 4 | 响应线程数 |
queue_size | usize | 1000 | 内部处理队列大小 |
Kafka Broker 固定监听端口
9095。
6b. AMQP 运行时配置
[amqp_runtime]
AMQP 协议服务网络线程配置。
[amqp_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
accept_thread_num | usize | 2 | 接受连接的线程数 |
handler_thread_num | usize | 16 | 请求处理线程数 |
response_thread_num | usize | 4 | 响应线程数 |
queue_size | usize | 1000 | 内部处理队列大小 |
AMQP Broker 固定监听端口
5672。
7. 消息存储配置
[message_storage]
消息持久化存储后端配置。
[message_storage]
storage_type = "EngineMemory"| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
storage_type | string | "EngineMemory" | 存储类型 |
存储类型可选值:
| 值 | 说明 |
|---|---|
EngineMemory | 内存存储(重启后数据丢失,适用于测试) |
EngineSegment | 基于 Segment 的存储引擎 |
EngineRocksDB | 基于 RocksDB 的本地存储 |
Mysql | MySQL 数据库存储 |
MinIO | MinIO 对象存储 |
S3 | AWS S3 对象存储 |
根据所选 storage_type,需配置对应子项:
memory_config(EngineMemory 时可选):
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
max_records_per_shard | usize | 1000 | 每个 Shard 的最大记录数 |
max_shard_size_limit | usize | 10000000 | 每个 Shard 的最大总大小 |
mysql_config(Mysql 时):
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
mysql_addr | string | "" | MySQL 数据库地址 |
minio_config(MinIO 时):
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
data_dir | string | "" | MinIO 数据目录 |
bucket | string | "" | MinIO Bucket 名称 |
s3_config(S3 时):
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
endpoint | string | "" | S3 端点地址 |
bucket | string | "" | S3 Bucket 名称 |
region | string | "" | S3 Region |
access_key | string | "" | 访问密钥 |
secret_key | string | "" | 密钥 |
enable_virtual_host_style | bool | false | 是否使用虚拟主机风格访问 |
8. Offset 存储配置
[storage_offset]
消息消费 Offset 缓存配置。
[storage_offset]
enable_cache = true| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable_cache | bool | true | 是否启用 Offset 缓存 |
9. MQTT 服务器配置
[mqtt_server]
MQTT 协议监听端口配置。
[mqtt_server]
tcp_port = 1883
tls_port = 1885
websocket_port = 8083
websockets_port = 8085
quic_port = 9083| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
tcp_port | u32 | 1883 | MQTT over TCP 端口 |
tls_port | u32 | 1885 | MQTT over TLS 端口 |
websocket_port | u32 | 8083 | MQTT over WebSocket 端口 |
websockets_port | u32 | 8085 | MQTT over WebSocket Secure 端口 |
quic_port | u32 | 9083 | MQTT over QUIC 端口 |
10. MQTT 运行时配置
[mqtt_runtime]
MQTT 运行时基本参数。
[mqtt_runtime]
default_user = "admin"
default_password = "robustmq"
durable_sessions_enable = false
secret_free_login = false
is_self_protection_status = false
[mqtt_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
default_user | string | "admin" | 系统默认用户名 |
default_password | string | "robustmq" | 系统默认密码 |
durable_sessions_enable | bool | false | 是否启用持久会话(false 为临时会话,性能更好) |
secret_free_login | bool | false | 是否允许免密登录 |
is_self_protection_status | bool | false | 是否处于自我保护状态(连接过载时拒绝新连接) |
[mqtt_runtime.network] 网络线程配置:
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
accept_thread_num | usize | 2 | 接受连接的线程数 |
handler_thread_num | usize | 16 | 请求处理线程数 |
response_thread_num | usize | 4 | 响应线程数 |
queue_size | usize | 1000 | 内部处理队列大小 |
11. MQTT Keep Alive 配置
[mqtt_keep_alive]
MQTT 心跳保活配置。
[mqtt_keep_alive]
enable = true
default_time = 180
max_time = 3600
default_timeout = 2| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | bool | true | 是否启用 Keep Alive 心跳检测 |
default_time | u16 | 180 | 默认心跳间隔(秒) |
max_time | u16 | 3600 | 最大心跳间隔(秒) |
default_timeout | u16 | 2 | 连续超时次数后断开连接 |
12. MQTT 协议配置
[mqtt_protocol]
MQTT 协议参数配置。
[mqtt_protocol]
max_session_expiry_interval = 1800
default_session_expiry_interval = 30
topic_alias_max = 65535
max_qos_flight_message = 2
max_packet_size = 10485760
receive_max = 65535
max_message_expiry_interval = 3600
client_pkid_persistent = false| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
max_session_expiry_interval | u32 | 1800 | 会话最大过期时间(秒) |
default_session_expiry_interval | u32 | 30 | 会话默认过期时间(秒) |
topic_alias_max | u16 | 65535 | 主题别名最大数量 |
max_qos_flight_message | u8 | 2 | QoS 飞行窗口最大消息数 |
max_packet_size | u32 | 10485760 (10 MB) | 单个 MQTT 数据包最大大小(字节) |
receive_max | u16 | 65535 | 未确认的 PUBLISH 数据包最大数量 |
max_message_expiry_interval | u64 | 3600 | 消息最大过期时间(秒) |
client_pkid_persistent | bool | false | 是否持久化客户端 Packet ID |
13. 限流配置
[limit]
集群和租户级别的资源限流配置。
[limit.cluster]
max_connections_per_node = 10000000
max_create_connection_rate_per_second = 100000
max_topics = 5000000
max_sessions = 50000000
max_publish_rate = 10000
[limit.tenant]
max_connections_per_node = 1000000
max_create_connection_rate_per_second = 10000
max_topics = 500000
max_sessions = 5000000
max_publish_rate = 10000| 配置项 | 类型 | 说明 |
|---|---|---|
max_connections_per_node | u64 | 每节点最大连接数 |
max_create_connection_rate_per_second | u32 | 每秒最大新建连接速率 |
max_topics | u64 | 最大 Topic 数量 |
max_sessions | u64 | 最大 Session 数量 |
max_publish_rate | u32 | 每秒最大 Publish 消息速率 |
15. MQTT 离线消息配置
[mqtt_offline_message]
客户端离线期间的消息存储配置。
[mqtt_offline_message]
enable = true
expire_ms = 0
max_messages_num = 0| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | bool | true | 是否启用离线消息 |
expire_ms | u32 | 0 | 离线消息过期时间(毫秒),0 表示不过期 |
max_messages_num | u32 | 0 | 每个客户端最大离线消息数,0 表示无限制 |
16. MQTT 连接抖动检测配置
[mqtt_flapping_detect]
检测客户端频繁连接/断开(抖动)并自动封禁。
[mqtt_flapping_detect]
enable = false
window_time = 1
max_client_connections = 15
ban_time = 5| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | bool | false | 是否启用连接抖动检测 |
window_time | u32 | 1 | 检测时间窗口(秒) |
max_client_connections | u64 | 15 | 时间窗口内最大连接次数 |
ban_time | u32 | 5 | 触发抖动后封禁时间(秒) |
17. MQTT 慢订阅检测配置
[mqtt_slow_subscribe]
慢订阅监控配置,用于检测消息分发延迟。
[mqtt_slow_subscribe]
enable = false
record_time = 1000
delay_type = "Whole"| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | bool | false | 是否启用慢订阅检测 |
record_time | u64 | 1000 | 慢订阅记录阈值(毫秒) |
delay_type | string | "Whole" | 延迟计算类型:Whole(全链路)、Partial(部分) |
18. MQTT Schema 验证配置
[mqtt_schema]
消息 Schema 验证配置。
[mqtt_schema]
enable = true
strategy = "ALL"
failed_operation = "Discard"
echo_log = true
log_level = "info"| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | bool | true | 是否启用 Schema 验证 |
strategy | string | "ALL" | 验证策略 |
failed_operation | string | "Discard" | 验证失败时的操作 |
echo_log | bool | true | 是否输出 Schema 验证日志 |
log_level | string | "info" | Schema 验证日志级别 |
验证策略(strategy):
ALL:消息必须通过所有绑定的 Schema 验证Any:消息只需通过任一 Schema 验证
失败操作(failed_operation):
Discard:丢弃验证失败的消息DisconnectAndDiscard:断开连接并丢弃消息Ignore:忽略验证失败,继续处理
19. MQTT 系统监控配置
[mqtt_system_monitor]
系统资源监控配置。
[mqtt_system_monitor]
enable = false
os_cpu_high_watermark = 70.0
os_memory_high_watermark = 80.0| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | bool | false | 是否启用系统资源监控 |
os_cpu_high_watermark | f32 | 70.0 | CPU 使用率高水位线(%) |
os_memory_high_watermark | f32 | 80.0 | 内存使用率高水位线(%) |
20. gRPC 客户端配置
[grpc_client]
控制 Broker 内部 gRPC 客户端的连接池行为。RobustMQ 使用 HTTP/2 进行节点间通信,每个 Channel 是一条独立的 TCP 连接,支持多路复用。
[grpc_client]
channels_per_address = 4| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
channels_per_address | usize | 4 | 每个 gRPC 服务地址维护的 HTTP/2 Channel 数量 |
调优说明:
每个 HTTP/2 Channel 支持约 200 个并发 Stream(即并发 RPC 请求),默认值 4 可支撑约 800 个并发 gRPC 请求,覆盖绝大多数生产场景。
| 场景 | 建议值 |
|---|---|
| 默认 / 常规生产 | 4 |
| 高并发(万级 MQTT 连接) | 8 ~ 16 |
| 极高并发压测 | 32 |
注意: 该值过大会导致系统打开的 TCP 文件描述符数量暴增(每个 Channel 占用一个 fd),在
ulimit -n较小的环境下可能引发Too many open files错误。
环境变量:
export ROBUST_MQ_SERVER_GRPC_CLIENT_CHANNELS_PER_ADDRESS=821. LLM 客户端配置
[llm_client]
用于配置 Broker 内部统一的 LLM 调用客户端(LLMClient)。该配置为可选项,不配置时不会启用 LLM 客户端。
[llm_client]
platform = "open_ai"
model = "gpt-4o-mini"
token = "your_api_token"
# 可选:用于 OpenAI 兼容网关或私有部署
# base_url = "https://api.openai.com/v1/"| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
platform | string | 无 | LLM 平台标识 |
model | string | 无 | 模型名称,如 gpt-4o-mini、claude-3-5-sonnet、gemini-2.0-flash |
token | string | 无 | 访问令牌。除 ollama 外其余平台必填 |
base_url | string | 无 | 自定义 API 基地址(可选) |
base_url 说明(重点):
- 不填
base_url时,会使用genai的默认官方 endpoint。 - 只有在以下场景建议填写:使用代理网关、OpenAI 兼容服务、私有化部署、内网转发。
ollama不填时默认走本机:http://localhost:11434/v1/。
常见平台不填 base_url 的默认行为:
platform | base_url 可否省略 | 默认 endpoint |
|---|---|---|
open_ai / open_ai_resp | 可以 | OpenAI 官方 |
gemini | 可以 | Google Gemini 官方 |
anthropic | 可以 | Anthropic 官方 |
cohere | 可以 | Cohere 官方 |
xai | 可以 | xAI 官方 |
deep_seek | 可以 | DeepSeek 官方 |
groq / together / fireworks / nebius / mimo / zai / big_model | 可以 | 各平台官方 |
ollama | 可以 | http://localhost:11434/v1/ |
platform 可选值:
open_aiopen_ai_respgeminianthropicfireworkstogethergroqmimonebiusxaideep_seekzaibig_modelcohereollama
环境变量示例:
export ROBUST_MQ_SERVER_LLM_CLIENT_PLATFORM=open_ai
export ROBUST_MQ_SERVER_LLM_CLIENT_MODEL=gpt-4o-mini
export ROBUST_MQ_SERVER_LLM_CLIENT_TOKEN=your_api_token
# export ROBUST_MQ_SERVER_LLM_CLIENT_BASE_URL=https://api.openai.com/v1/22. 监控配置
[prometheus]
Prometheus 指标暴露配置。
[prometheus]
enable = true
port = 9090| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | bool | true | 是否启用 Prometheus 指标收集 |
port | u32 | 9090 | Prometheus 指标暴露端口 |
[pprof]
PProf 性能分析配置。
[pprof]
enable = false
port = 6060
frequency = 100| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
enable | bool | false | 是否启用 PProf 性能分析 |
port | u16 | 6060 | PProf 服务端口 |
frequency | i32 | 100 | 采样频率 |
完整配置示例
# ========== 基础配置 ==========
cluster_name = "production-cluster"
broker_id = 1
roles = ["meta", "broker", "engine"]
grpc_port = 1228
http_port = 8080
[meta_addrs]
1 = "192.168.1.10:1228"
2 = "192.168.1.11:1228"
3 = "192.168.1.12:1228"
# ========== 运行时 ==========
[runtime]
tls_cert = "./config/certs/cert.pem"
tls_key = "./config/certs/key.pem"
# server_worker_threads = 0
# meta_worker_threads = 0
# broker_worker_threads = 0
# ========== Meta ==========
[meta_runtime]
heartbeat_timeout_ms = 30000
heartbeat_check_time_ms = 1000
raft_write_timeout_sec = 30
offset_raft_group_num = 1
data_raft_group_num = 1
# ========== RocksDB ==========
[rocksdb]
data_path = "/data/robustmq"
max_open_files = 20000
# ========== 存储引擎 ==========
[storage_runtime]
tcp_port = 1778
max_segment_size = 1073741824
io_thread_num = 8
[storage_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000
# ========== Kafka 运行时 ==========
[kafka_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000
# ========== AMQP 运行时 ==========
[amqp_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000
# ========== 消息存储 ==========
[message_storage]
storage_type = "EngineMemory"
# ========== Offset 缓存 ==========
[storage_offset]
enable_cache = true
# ========== MQTT 服务器 ==========
[mqtt_server]
tcp_port = 1883
tls_port = 1885
websocket_port = 8083
websockets_port = 8085
quic_port = 9083
# ========== MQTT 运行时 ==========
[mqtt_runtime]
default_user = "admin"
default_password = "your_secure_password"
durable_sessions_enable = false
secret_free_login = false
is_self_protection_status = false
[mqtt_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000
# ========== MQTT Keep Alive ==========
[mqtt_keep_alive]
enable = true
default_time = 180
max_time = 3600
default_timeout = 2
# ========== MQTT 协议 ==========
[mqtt_protocol]
max_session_expiry_interval = 1800
default_session_expiry_interval = 30
topic_alias_max = 65535
max_qos_flight_message = 2
max_packet_size = 10485760
receive_max = 65535
max_message_expiry_interval = 3600
client_pkid_persistent = false
# ========== MQTT 离线消息 ==========
[mqtt_offline_message]
enable = true
expire_ms = 0
max_messages_num = 0
# ========== MQTT 抖动检测 ==========
[mqtt_flapping_detect]
enable = false
window_time = 1
max_client_connections = 15
ban_time = 5
# ========== MQTT 慢订阅 ==========
[mqtt_slow_subscribe]
enable = false
record_time = 1000
delay_type = "Whole"
# ========== MQTT Schema ==========
[mqtt_schema]
enable = true
strategy = "ALL"
failed_operation = "Discard"
echo_log = true
log_level = "info"
# ========== MQTT 系统监控 ==========
[mqtt_system_monitor]
enable = false
os_cpu_high_watermark = 70.0
os_memory_high_watermark = 80.0
# ========== 监控 ==========
[prometheus]
enable = true
port = 9090
[pprof]
enable = false
port = 6060
frequency = 100
# ========== gRPC 客户端 ==========
[grpc_client]
channels_per_address = 4
# ========== LLM 客户端(可选) ==========
[llm_client]
platform = "open_ai"
model = "gpt-4o-mini"
token = "your_api_token"
# base_url = "https://api.openai.com/v1/"
# ========== 日志 ==========
[log]
log_config = "./config/broker-tracing.toml"
log_path = "./logs"