Skip to content

Broker 配置说明

本文档描述 RobustMQ Broker 服务的所有配置项。日志配置请参考 Logging.md

概述

RobustMQ 使用 TOML 格式的配置文件来管理系统配置。主配置文件为 config/server.toml

配置加载优先级

  1. 环境变量(最高)
  2. 配置文件
  3. 默认值(最低)

环境变量覆盖

支持通过环境变量覆盖配置文件中的设置。命名规则:

ROBUST_MQ_SERVER_{SECTION}_{KEY}
  • 顶层配置项:ROBUST_MQ_SERVER_{KEY}
  • Section 内配置项:ROBUST_MQ_SERVER_{SECTION}_{KEY}
  • 所有字母大写,. 替换为 _

示例:

bash
export ROBUST_MQ_SERVER_CLUSTER_NAME="my-cluster"
export ROBUST_MQ_SERVER_MQTT_SERVER_TCP_PORT=1883
export ROBUST_MQ_SERVER_PROMETHEUS_PORT=9091

1. 基础配置

顶层配置项,定义集群和节点的基本信息。

toml
cluster_name = "robust_mq_cluster_default"
broker_id = 1
broker_ip = "127.0.0.1"
roles = ["broker", "meta"]
grpc_port = 1228
http_port = 8080

[meta_addrs]
1 = "127.0.0.1:1228"
配置项类型默认值说明
cluster_namestring"robust_mq_cluster_default"集群名称,同一集群内所有节点必须一致
broker_idu641节点唯一标识
broker_ipstring自动获取本机 IP节点 IP 地址
rolesarray["broker", "meta"]节点角色列表,可选值:metabrokerengine
grpc_portu321228gRPC 服务端口
http_portu328080HTTP API 服务端口
meta_addrstable{1 = "127.0.0.1:1228"}Meta 节点地址映射,键为节点 ID,值为 IP:端口

部署模式

  • 一体化部署roles = ["meta", "broker", "engine"]
  • 分离式部署
    • Meta 节点:roles = ["meta"]
    • Broker 节点:roles = ["broker"]
    • Engine 节点:roles = ["engine"]

2. 运行时配置

[runtime]

Tokio 运行时与 TLS 配置。RobustMQ 内部划分了三个独立的 Tokio 运行时,分别承担不同职责,可以独立调优。

toml
[runtime]
tls_cert = "./config/certs/cert.pem"
tls_key = "./config/certs/key.pem"
# 各运行时工作线程数,0 = 自动(推荐)
# server_worker_threads = 0
# meta_worker_threads = 0
# broker_worker_threads = 0
配置项类型默认值说明
tls_certstring"./config/certs/cert.pem"TLS 证书文件路径
tls_keystring"./config/certs/key.pem"TLS 私钥文件路径
server_worker_threadsusize0(自动)server-runtime 工作线程数,自动值 = max(4, CPU核数 / 2)
meta_worker_threadsusize0(自动)meta-runtime 工作线程数,自动值 = max(4, CPU核数 / 2)
broker_worker_threadsusize0(自动)broker-runtime 工作线程数,自动值 = CPU核数

三个运行时说明:

运行时职责默认线程数
server-runtimegRPC 服务、HTTP Admin API、Prometheus 指标暴露max(4, CPU/2)
meta-runtimeRaft 状态机、RocksDB 写入max(4, CPU/2)
broker-runtimeMQTT 连接处理、消息投递热路径CPU核数

调优建议: 保持默认值 0 即可。通过 Grafana 的 tokio_runtime_busy_ratio 指标判断是否需要调整:某个运行时繁忙比持续 > 80% 时,可适当增加其线程数。


4. Meta 运行时配置

[meta_runtime]

元数据服务心跳与 Raft 配置。

toml
[meta_runtime]
heartbeat_timeout_ms = 30000
heartbeat_check_time_ms = 1000
raft_write_timeout_sec = 30
offset_raft_group_num = 1
data_raft_group_num = 1
配置项类型默认值说明
heartbeat_timeout_msu6430000节点心跳超时时间(毫秒),超时后标记节点不可用
heartbeat_check_time_msu641000心跳检查间隔(毫秒)
raft_write_timeout_secu6430Raft 写操作超时时间(秒)
offset_raft_group_numu321Offset Raft 分组数量
data_raft_group_numu321数据 Raft 分组数量

5. RocksDB 配置

[rocksdb]

本地 RocksDB 存储配置。

toml
[rocksdb]
data_path = "./data"
max_open_files = 10000
配置项类型默认值说明
data_pathstring"./data"RocksDB 数据存储目录
max_open_filesi3210000最大同时打开文件数

6. 存储引擎运行时配置

[storage_runtime]

Journal 存储引擎运行时配置。

toml
[storage_runtime]
tcp_port = 1778
max_segment_size = 1073741824
io_thread_num = 8
data_path = []

[storage_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000
配置项类型默认值说明
tcp_portu321778存储引擎 TCP 端口
max_segment_sizeu321073741824 (1 GB)单个 Segment 文件最大大小(字节)
io_thread_numu328IO 处理线程数
data_patharray[]数据存储路径列表

[storage_runtime.network] 网络线程配置:

配置项类型默认值说明
accept_thread_numusize2接受连接的线程数
handler_thread_numusize16请求处理线程数
response_thread_numusize4响应线程数
queue_sizeusize1000内部处理队列大小

6a. Kafka 运行时配置

[kafka_runtime]

Kafka 协议服务网络线程配置。

toml
[kafka_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000
配置项类型默认值说明
accept_thread_numusize2接受连接的线程数
handler_thread_numusize16请求处理线程数
response_thread_numusize4响应线程数
queue_sizeusize1000内部处理队列大小

Kafka Broker 固定监听端口 9095


6b. AMQP 运行时配置

[amqp_runtime]

AMQP 协议服务网络线程配置。

toml
[amqp_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000
配置项类型默认值说明
accept_thread_numusize2接受连接的线程数
handler_thread_numusize16请求处理线程数
response_thread_numusize4响应线程数
queue_sizeusize1000内部处理队列大小

AMQP Broker 固定监听端口 5672


7. 消息存储配置

[message_storage]

消息持久化存储后端配置。

toml
[message_storage]
storage_type = "EngineMemory"
配置项类型默认值说明
storage_typestring"EngineMemory"存储类型

存储类型可选值:

说明
EngineMemory内存存储(重启后数据丢失,适用于测试)
EngineSegment基于 Segment 的存储引擎
EngineRocksDB基于 RocksDB 的本地存储
MysqlMySQL 数据库存储
MinIOMinIO 对象存储
S3AWS S3 对象存储

根据所选 storage_type,需配置对应子项:

memory_config(EngineMemory 时可选):

配置项类型默认值说明
max_records_per_shardusize1000每个 Shard 的最大记录数
max_shard_size_limitusize10000000每个 Shard 的最大总大小

mysql_config(Mysql 时):

配置项类型默认值说明
mysql_addrstring""MySQL 数据库地址

minio_config(MinIO 时):

配置项类型默认值说明
data_dirstring""MinIO 数据目录
bucketstring""MinIO Bucket 名称

s3_config(S3 时):

配置项类型默认值说明
endpointstring""S3 端点地址
bucketstring""S3 Bucket 名称
regionstring""S3 Region
access_keystring""访问密钥
secret_keystring""密钥
enable_virtual_host_styleboolfalse是否使用虚拟主机风格访问

8. Offset 存储配置

[storage_offset]

消息消费 Offset 缓存配置。

toml
[storage_offset]
enable_cache = true
配置项类型默认值说明
enable_cachebooltrue是否启用 Offset 缓存

9. MQTT 服务器配置

[mqtt_server]

MQTT 协议监听端口配置。

toml
[mqtt_server]
tcp_port = 1883
tls_port = 1885
websocket_port = 8083
websockets_port = 8085
quic_port = 9083
配置项类型默认值说明
tcp_portu321883MQTT over TCP 端口
tls_portu321885MQTT over TLS 端口
websocket_portu328083MQTT over WebSocket 端口
websockets_portu328085MQTT over WebSocket Secure 端口
quic_portu329083MQTT over QUIC 端口

10. MQTT 运行时配置

[mqtt_runtime]

MQTT 运行时基本参数。

toml
[mqtt_runtime]
default_user = "admin"
default_password = "robustmq"
durable_sessions_enable = false
secret_free_login = false
is_self_protection_status = false

[mqtt_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000
配置项类型默认值说明
default_userstring"admin"系统默认用户名
default_passwordstring"robustmq"系统默认密码
durable_sessions_enableboolfalse是否启用持久会话(false 为临时会话,性能更好)
secret_free_loginboolfalse是否允许免密登录
is_self_protection_statusboolfalse是否处于自我保护状态(连接过载时拒绝新连接)

[mqtt_runtime.network] 网络线程配置:

配置项类型默认值说明
accept_thread_numusize2接受连接的线程数
handler_thread_numusize16请求处理线程数
response_thread_numusize4响应线程数
queue_sizeusize1000内部处理队列大小

11. MQTT Keep Alive 配置

[mqtt_keep_alive]

MQTT 心跳保活配置。

toml
[mqtt_keep_alive]
enable = true
default_time = 180
max_time = 3600
default_timeout = 2
配置项类型默认值说明
enablebooltrue是否启用 Keep Alive 心跳检测
default_timeu16180默认心跳间隔(秒)
max_timeu163600最大心跳间隔(秒)
default_timeoutu162连续超时次数后断开连接

12. MQTT 协议配置

[mqtt_protocol]

MQTT 协议参数配置。

toml
[mqtt_protocol]
max_session_expiry_interval = 1800
default_session_expiry_interval = 30
topic_alias_max = 65535
max_qos_flight_message = 2
max_packet_size = 10485760
receive_max = 65535
max_message_expiry_interval = 3600
client_pkid_persistent = false
配置项类型默认值说明
max_session_expiry_intervalu321800会话最大过期时间(秒)
default_session_expiry_intervalu3230会话默认过期时间(秒)
topic_alias_maxu1665535主题别名最大数量
max_qos_flight_messageu82QoS 飞行窗口最大消息数
max_packet_sizeu3210485760 (10 MB)单个 MQTT 数据包最大大小(字节)
receive_maxu1665535未确认的 PUBLISH 数据包最大数量
max_message_expiry_intervalu643600消息最大过期时间(秒)
client_pkid_persistentboolfalse是否持久化客户端 Packet ID

13. 限流配置

[limit]

集群和租户级别的资源限流配置。

toml
[limit.cluster]
max_connections_per_node = 10000000
max_create_connection_rate_per_second = 100000
max_topics = 5000000
max_sessions = 50000000
max_publish_rate = 10000

[limit.tenant]
max_connections_per_node = 1000000
max_create_connection_rate_per_second = 10000
max_topics = 500000
max_sessions = 5000000
max_publish_rate = 10000
配置项类型说明
max_connections_per_nodeu64每节点最大连接数
max_create_connection_rate_per_secondu32每秒最大新建连接速率
max_topicsu64最大 Topic 数量
max_sessionsu64最大 Session 数量
max_publish_rateu32每秒最大 Publish 消息速率

15. MQTT 离线消息配置

[mqtt_offline_message]

客户端离线期间的消息存储配置。

toml
[mqtt_offline_message]
enable = true
expire_ms = 0
max_messages_num = 0
配置项类型默认值说明
enablebooltrue是否启用离线消息
expire_msu320离线消息过期时间(毫秒),0 表示不过期
max_messages_numu320每个客户端最大离线消息数,0 表示无限制

16. MQTT 连接抖动检测配置

[mqtt_flapping_detect]

检测客户端频繁连接/断开(抖动)并自动封禁。

toml
[mqtt_flapping_detect]
enable = false
window_time = 1
max_client_connections = 15
ban_time = 5
配置项类型默认值说明
enableboolfalse是否启用连接抖动检测
window_timeu321检测时间窗口(秒)
max_client_connectionsu6415时间窗口内最大连接次数
ban_timeu325触发抖动后封禁时间(秒)

17. MQTT 慢订阅检测配置

[mqtt_slow_subscribe]

慢订阅监控配置,用于检测消息分发延迟。

toml
[mqtt_slow_subscribe]
enable = false
record_time = 1000
delay_type = "Whole"
配置项类型默认值说明
enableboolfalse是否启用慢订阅检测
record_timeu641000慢订阅记录阈值(毫秒)
delay_typestring"Whole"延迟计算类型:Whole(全链路)、Partial(部分)

18. MQTT Schema 验证配置

[mqtt_schema]

消息 Schema 验证配置。

toml
[mqtt_schema]
enable = true
strategy = "ALL"
failed_operation = "Discard"
echo_log = true
log_level = "info"
配置项类型默认值说明
enablebooltrue是否启用 Schema 验证
strategystring"ALL"验证策略
failed_operationstring"Discard"验证失败时的操作
echo_logbooltrue是否输出 Schema 验证日志
log_levelstring"info"Schema 验证日志级别

验证策略(strategy):

  • ALL:消息必须通过所有绑定的 Schema 验证
  • Any:消息只需通过任一 Schema 验证

失败操作(failed_operation):

  • Discard:丢弃验证失败的消息
  • DisconnectAndDiscard:断开连接并丢弃消息
  • Ignore:忽略验证失败,继续处理

19. MQTT 系统监控配置

[mqtt_system_monitor]

系统资源监控配置。

toml
[mqtt_system_monitor]
enable = false
os_cpu_high_watermark = 70.0
os_memory_high_watermark = 80.0
配置项类型默认值说明
enableboolfalse是否启用系统资源监控
os_cpu_high_watermarkf3270.0CPU 使用率高水位线(%)
os_memory_high_watermarkf3280.0内存使用率高水位线(%)

20. gRPC 客户端配置

[grpc_client]

控制 Broker 内部 gRPC 客户端的连接池行为。RobustMQ 使用 HTTP/2 进行节点间通信,每个 Channel 是一条独立的 TCP 连接,支持多路复用。

toml
[grpc_client]
channels_per_address = 4
配置项类型默认值说明
channels_per_addressusize4每个 gRPC 服务地址维护的 HTTP/2 Channel 数量

调优说明:

每个 HTTP/2 Channel 支持约 200 个并发 Stream(即并发 RPC 请求),默认值 4 可支撑约 800 个并发 gRPC 请求,覆盖绝大多数生产场景。

场景建议值
默认 / 常规生产4
高并发(万级 MQTT 连接)8 ~ 16
极高并发压测32

注意: 该值过大会导致系统打开的 TCP 文件描述符数量暴增(每个 Channel 占用一个 fd),在 ulimit -n 较小的环境下可能引发 Too many open files 错误。

环境变量:

bash
export ROBUST_MQ_SERVER_GRPC_CLIENT_CHANNELS_PER_ADDRESS=8

21. LLM 客户端配置

[llm_client]

用于配置 Broker 内部统一的 LLM 调用客户端(LLMClient)。该配置为可选项,不配置时不会启用 LLM 客户端。

toml
[llm_client]
platform = "open_ai"
model = "gpt-4o-mini"
token = "your_api_token"
# 可选:用于 OpenAI 兼容网关或私有部署
# base_url = "https://api.openai.com/v1/"
配置项类型默认值说明
platformstringLLM 平台标识
modelstring模型名称,如 gpt-4o-miniclaude-3-5-sonnetgemini-2.0-flash
tokenstring访问令牌。除 ollama 外其余平台必填
base_urlstring自定义 API 基地址(可选)

base_url 说明(重点):

  • 不填 base_url 时,会使用 genai 的默认官方 endpoint。
  • 只有在以下场景建议填写:使用代理网关、OpenAI 兼容服务、私有化部署、内网转发。
  • ollama 不填时默认走本机:http://localhost:11434/v1/

常见平台不填 base_url 的默认行为:

platformbase_url 可否省略默认 endpoint
open_ai / open_ai_resp可以OpenAI 官方
gemini可以Google Gemini 官方
anthropic可以Anthropic 官方
cohere可以Cohere 官方
xai可以xAI 官方
deep_seek可以DeepSeek 官方
groq / together / fireworks / nebius / mimo / zai / big_model可以各平台官方
ollama可以http://localhost:11434/v1/

platform 可选值:

  • open_ai
  • open_ai_resp
  • gemini
  • anthropic
  • fireworks
  • together
  • groq
  • mimo
  • nebius
  • xai
  • deep_seek
  • zai
  • big_model
  • cohere
  • ollama

环境变量示例:

bash
export ROBUST_MQ_SERVER_LLM_CLIENT_PLATFORM=open_ai
export ROBUST_MQ_SERVER_LLM_CLIENT_MODEL=gpt-4o-mini
export ROBUST_MQ_SERVER_LLM_CLIENT_TOKEN=your_api_token
# export ROBUST_MQ_SERVER_LLM_CLIENT_BASE_URL=https://api.openai.com/v1/

22. 监控配置

[prometheus]

Prometheus 指标暴露配置。

toml
[prometheus]
enable = true
port = 9090
配置项类型默认值说明
enablebooltrue是否启用 Prometheus 指标收集
portu329090Prometheus 指标暴露端口

[pprof]

PProf 性能分析配置。

toml
[pprof]
enable = false
port = 6060
frequency = 100
配置项类型默认值说明
enableboolfalse是否启用 PProf 性能分析
portu166060PProf 服务端口
frequencyi32100采样频率

完整配置示例

toml
# ========== 基础配置 ==========
cluster_name = "production-cluster"
broker_id = 1
roles = ["meta", "broker", "engine"]
grpc_port = 1228
http_port = 8080

[meta_addrs]
1 = "192.168.1.10:1228"
2 = "192.168.1.11:1228"
3 = "192.168.1.12:1228"

# ========== 运行时 ==========
[runtime]
tls_cert = "./config/certs/cert.pem"
tls_key = "./config/certs/key.pem"
# server_worker_threads = 0
# meta_worker_threads = 0
# broker_worker_threads = 0

# ========== Meta ==========
[meta_runtime]
heartbeat_timeout_ms = 30000
heartbeat_check_time_ms = 1000
raft_write_timeout_sec = 30
offset_raft_group_num = 1
data_raft_group_num = 1

# ========== RocksDB ==========
[rocksdb]
data_path = "/data/robustmq"
max_open_files = 20000

# ========== 存储引擎 ==========
[storage_runtime]
tcp_port = 1778
max_segment_size = 1073741824
io_thread_num = 8

[storage_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000

# ========== Kafka 运行时 ==========
[kafka_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000

# ========== AMQP 运行时 ==========
[amqp_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000

# ========== 消息存储 ==========
[message_storage]
storage_type = "EngineMemory"

# ========== Offset 缓存 ==========
[storage_offset]
enable_cache = true

# ========== MQTT 服务器 ==========
[mqtt_server]
tcp_port = 1883
tls_port = 1885
websocket_port = 8083
websockets_port = 8085
quic_port = 9083

# ========== MQTT 运行时 ==========
[mqtt_runtime]
default_user = "admin"
default_password = "your_secure_password"
durable_sessions_enable = false
secret_free_login = false
is_self_protection_status = false

[mqtt_runtime.network]
accept_thread_num = 2
handler_thread_num = 16
response_thread_num = 4
queue_size = 1000

# ========== MQTT Keep Alive ==========
[mqtt_keep_alive]
enable = true
default_time = 180
max_time = 3600
default_timeout = 2

# ========== MQTT 协议 ==========
[mqtt_protocol]
max_session_expiry_interval = 1800
default_session_expiry_interval = 30
topic_alias_max = 65535
max_qos_flight_message = 2
max_packet_size = 10485760
receive_max = 65535
max_message_expiry_interval = 3600
client_pkid_persistent = false

# ========== MQTT 离线消息 ==========
[mqtt_offline_message]
enable = true
expire_ms = 0
max_messages_num = 0

# ========== MQTT 抖动检测 ==========
[mqtt_flapping_detect]
enable = false
window_time = 1
max_client_connections = 15
ban_time = 5

# ========== MQTT 慢订阅 ==========
[mqtt_slow_subscribe]
enable = false
record_time = 1000
delay_type = "Whole"

# ========== MQTT Schema ==========
[mqtt_schema]
enable = true
strategy = "ALL"
failed_operation = "Discard"
echo_log = true
log_level = "info"

# ========== MQTT 系统监控 ==========
[mqtt_system_monitor]
enable = false
os_cpu_high_watermark = 70.0
os_memory_high_watermark = 80.0

# ========== 监控 ==========
[prometheus]
enable = true
port = 9090

[pprof]
enable = false
port = 6060
frequency = 100

# ========== gRPC 客户端 ==========
[grpc_client]
channels_per_address = 4

# ========== LLM 客户端(可选) ==========
[llm_client]
platform = "open_ai"
model = "gpt-4o-mini"
token = "your_api_token"
# base_url = "https://api.openai.com/v1/"

# ========== 日志 ==========
[log]
log_config = "./config/broker-tracing.toml"
log_path = "./logs"
🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀