基础设施指标
RobustMQ 提供了全面的基础设施监控指标,帮助运维人员监控系统的健康状态、性能表现和资源使用情况。所有指标都基于 Prometheus 格式,可以通过 /metrics 端点获取。
网络层指标 (Network)
请求处理指标
| 指标名称 | 类型 | 标签 | 描述 | 代码位置 |
|---|---|---|---|---|
request_total_ms | Histogram | network | 请求数据包在 Broker 中处理的总时长(毫秒) | REQUEST_TOTAL_MS |
request_queue_ms | Histogram | network | 请求数据包在 Broker 队列中的等待时长(毫秒) | REQUEST_QUEUE_MS |
request_handler_ms | Histogram | network | 请求数据包在 Broker 中被处理器处理的时长(毫秒) | REQUEST_HANDLER_MS |
request_response_ms | Histogram | network | 请求数据包响应的处理时长(毫秒) | REQUEST_RESPONSE_MS |
request_response_queue_ms | Histogram | network | 请求数据包响应在队列中的等待时长(毫秒) | REQUEST_RESPONSE_QUEUE_MS |
指标说明:
- 所有请求处理指标都使用默认的 Histogram buckets
- 用于测量请求在不同处理阶段的耗时
- 时间单位统一为毫秒(ms)
队列指标
| 指标名称 | 类型 | 标签 | 描述 | 代码位置 |
|---|---|---|---|---|
network_queue_num | Gauge | label | Broker 请求网络队列中的消息数量 | BROKER_NETWORK_REQUEST_QUEUE_NUM |
network_queue_num | Gauge | label | Broker 响应网络队列中的消息数量 | BROKER_NETWORK_RESPONSE_QUEUE_NUM |
指标说明:
- 使用
label字段区分不同的队列类型 - 请求队列和响应队列使用相同的指标名称,通过标签区分
线程指标
| 指标名称 | 类型 | 标签 | 描述 | 代码位置 |
|---|---|---|---|---|
broker_active_thread_num | Gauge | network, thread_type | Broker 启动的执行活跃线程数量 | BROKER_ACTIVE_THREAD_NUM |
线程类型说明:
accept: 接受连接的线程handler: 处理请求的线程response: 响应处理的线程
标签说明
| 标签名 | 取值范围 | 说明 |
|---|---|---|
network | TCP, WebSocket, QUIC | 网络连接类型,来自 NetworkConnectionType 枚举 |
thread_type | accept, handler, response | 线程类型,用于区分不同功能的线程池 |
label | 自定义字符串 | 队列标签,用于标识特定的队列实例 |
相关函数说明
指标记录函数:
rust
// 记录请求处理总耗时
pub fn metrics_request_total_ms(network_connection: &NetworkConnectionType, ms: f64)
// 记录请求队列等待时间
pub fn metrics_request_queue_ms(network_connection: &NetworkConnectionType, ms: f64)
// 记录请求处理器执行时间
pub fn metrics_request_handler_ms(network_connection: &NetworkConnectionType, ms: f64)
// 记录响应队列等待时间
pub fn metrics_request_response_queue_ms(network_connection: &NetworkConnectionType, ms: f64)
// 记录响应处理时间
pub fn metrics_request_response_ms(network_connection: &NetworkConnectionType, ms: f64)
// 记录请求队列大小
pub fn metrics_request_queue_size(label: &str, len: usize)
// 记录响应队列大小
pub fn metrics_response_queue_size(label: &str, len: usize)便捷记录函数:
rust
// 同时记录响应时间和总时间
pub fn record_response_and_total_ms(
connection_type: &NetworkConnectionType,
receive_ms: u128,
response_ms: u128,
)
// 记录 WebSocket 请求的完整时长
pub fn record_ws_request_duration(receive_ms: u128, response_ms: u128)
// 记录 Broker 线程数量(accept, handler, response)
pub fn record_broker_thread_num(
network_connection: &NetworkConnectionType,
(accept, handler, response): (usize, usize, usize),
)gRPC 服务指标
请求统计
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
grpc_requests_total | Counter | method, service | gRPC 请求总数 |
grpc_request_errors_total | Counter | method, service, error_code | gRPC 请求错误总数 |
性能指标
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
grpc_request_duration_milliseconds | Histogram | method, service | gRPC 请求耗时(毫秒) |
grpc_request_size_bytes | Histogram | method, service | gRPC 请求大小(字节) |
grpc_response_size_bytes | Histogram | method, service | gRPC 响应大小(字节) |
标签说明:
method: gRPC 方法名service: gRPC 服务名error_code: 错误代码
HTTP 服务指标
请求统计
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
http_requests_total | Counter | method, path, status_code | HTTP 请求总数 |
http_request_errors_total | Counter | method, path, error_type | HTTP 请求错误总数 |
性能指标
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
http_request_duration_milliseconds | Histogram | method, path | HTTP 请求耗时(毫秒) |
http_request_size_bytes | Histogram | method, path | HTTP 请求大小(字节) |
http_response_size_bytes | Histogram | method, path | HTTP 响应大小(字节) |
标签说明:
method: HTTP 方法(GET, POST, PUT, DELETE)path: 请求路径status_code: HTTP 状态码error_type: 错误类型
存储层指标 (RocksDB)
操作统计
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
rocksdb_operation_count | Counter | source, operation | RocksDB 操作次数 |
rocksdb_operation_ms | Histogram | source, operation | RocksDB 操作耗时(毫秒) |
标签说明:
source: 数据源(如 metadata, session, message)operation: 操作类型(save, get, delete, list)
常见操作类型
- save: 数据写入操作
- get: 数据读取操作
- delete: 数据删除操作
- list: 数据列表查询操作
Broker 核心指标
系统状态
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
broker_status | Gauge | node_id | Broker 节点状态(1=运行,0=停止) |
broker_uptime_seconds | Counter | node_id | Broker 运行时间(秒) |
资源使用
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
broker_memory_usage_bytes | Gauge | node_id, type | 内存使用量(字节) |
broker_cpu_usage_percent | Gauge | node_id | CPU 使用率(百分比) |
broker_disk_usage_bytes | Gauge | node_id, path | 磁盘使用量(字节) |
标签说明:
node_id: 节点标识type: 内存类型(heap, non_heap, total)path: 磁盘路径
元数据服务指标 (Meta Service)
集群状态
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
meta_cluster_nodes_total | Gauge | cluster_id | 集群节点总数 |
meta_cluster_nodes_active | Gauge | cluster_id | 集群活跃节点数 |
meta_leader_elections_total | Counter | cluster_id | 领导者选举次数 |
数据同步
| 指标名称 | 类型 | 标签 | 描述 |
|---|---|---|---|
meta_sync_operations_total | Counter | operation_type | 元数据同步操作次数 |
meta_sync_latency_ms | Histogram | operation_type | 元数据同步延迟(毫秒) |
使用示例
Prometheus 配置
yaml
scrape_configs:
- job_name: 'robustmq'
static_configs:
- targets: ['localhost:9090']
metrics_path: '/metrics'
scrape_interval: 15sGrafana 查询示例
网络层指标查询:
promql
# 请求处理总耗时 P95 分位数(按网络类型)
histogram_quantile(0.95, rate(request_total_ms_bucket[5m]))
# 请求处理总耗时 P50 分位数(按网络类型)
histogram_quantile(0.50, rate(request_total_ms_bucket[5m]))
# 请求队列等待时间 P95(WebSocket)
histogram_quantile(0.95, rate(request_queue_ms_bucket{network="WebSocket"}[5m]))
# 请求处理器执行时间平均值
rate(request_handler_ms_sum[5m]) / rate(request_handler_ms_count[5m])
# 响应处理时间最大值
histogram_quantile(1.0, rate(request_response_ms_bucket[5m]))
# 网络队列积压(请求队列)
network_queue_num{label=~".*request.*"}
# 网络队列积压(响应队列)
network_queue_num{label=~".*response.*"}
# 活跃线程数(按类型)
broker_active_thread_num{thread_type="handler"}
# 所有网络类型的总活跃线程数
sum(broker_active_thread_num) by (network)其他指标查询:
promql
# gRPC 错误率
rate(grpc_request_errors_total[5m]) / rate(grpc_requests_total[5m]) * 100
# RocksDB 操作 QPS
rate(rocksdb_operation_count[5m])
# RocksDB 操作平均延迟
rate(rocksdb_operation_ms_sum[5m]) / rate(rocksdb_operation_ms_count[5m])告警规则示例
yaml
groups:
- name: robustmq_network_metrics
rules:
# 请求处理延迟告警
- alert: HighRequestLatency
expr: histogram_quantile(0.95, rate(request_total_ms_bucket[5m])) > 1000
for: 2m
labels:
severity: warning
annotations:
summary: "请求处理延迟过高"
description: "95% 的请求处理总耗时超过 1000ms,当前值: {{ $value }}ms,网络类型: {{ $labels.network }}"
# 队列等待时间告警
- alert: HighQueueWaitTime
expr: histogram_quantile(0.95, rate(request_queue_ms_bucket[5m])) > 200
for: 2m
labels:
severity: warning
annotations:
summary: "请求队列等待时间过长"
description: "95% 的请求在队列中等待时间超过 200ms,当前值: {{ $value }}ms"
# 处理器执行时间告警
- alert: SlowRequestHandler
expr: histogram_quantile(0.95, rate(request_handler_ms_bucket[5m])) > 500
for: 3m
labels:
severity: warning
annotations:
summary: "请求处理器执行缓慢"
description: "95% 的请求处理器执行时间超过 500ms,当前值: {{ $value }}ms"
# 响应处理延迟告警
- alert: HighResponseLatency
expr: histogram_quantile(0.95, rate(request_response_ms_bucket[5m])) > 300
for: 2m
labels:
severity: warning
annotations:
summary: "响应处理延迟过高"
description: "95% 的响应处理时间超过 300ms,当前值: {{ $value }}ms"
# 网络队列积压告警
- alert: NetworkQueueBacklog
expr: network_queue_num > 10000
for: 1m
labels:
severity: warning
annotations:
summary: "网络队列积压严重"
description: "网络队列消息数量超过 10000,当前值: {{ $value }},队列标签: {{ $labels.label }}"
# 线程数异常告警
- alert: LowActiveThreads
expr: broker_active_thread_num{thread_type="handler"} < 2
for: 1m
labels:
severity: critical
annotations:
summary: "处理线程数过低"
description: "处理线程数低于 2,当前值: {{ $value }},网络类型: {{ $labels.network }}"
- name: robustmq_grpc_metrics
rules:
# gRPC 错误率告警
- alert: HighGrpcErrorRate
expr: rate(grpc_request_errors_total[5m]) / rate(grpc_requests_total[5m]) > 0.05
for: 1m
labels:
severity: critical
annotations:
summary: "gRPC 错误率过高"
description: "gRPC 错误率超过 5%,当前值: {{ $value | humanizePercentage }}"
- name: robustmq_storage_metrics
rules:
# RocksDB 操作缓慢告警
- alert: RocksDBSlowOperations
expr: histogram_quantile(0.95, rate(rocksdb_operation_ms_bucket[5m])) > 100
for: 3m
labels:
severity: warning
annotations:
summary: "RocksDB 操作缓慢"
description: "95% 的 RocksDB 操作耗时超过 100ms,当前值: {{ $value }}ms,操作类型: {{ $labels.operation }}"代码使用示例
基本用法
rust
use common_base::tools::now_mills;
use common_metrics::network::*;
use metadata_struct::connection::NetworkConnectionType;
// 记录单个指标
let network_type = NetworkConnectionType::TCP;
metrics_request_total_ms(&network_type, 125.5);
metrics_request_handler_ms(&network_type, 45.2);
// 记录队列大小
metrics_request_queue_size("tcp_request_queue", 100);
metrics_response_queue_size("tcp_response_queue", 50);完整请求处理示例
rust
use common_metrics::network::*;
use metadata_struct::connection::NetworkConnectionType;
fn handle_request(network_type: NetworkConnectionType) {
let receive_ms = now_mills();
// 处理请求逻辑...
let response_ms = now_mills();
// 记录响应和总时间
record_response_and_total_ms(&network_type, receive_ms, response_ms);
}WebSocket 请求处理示例
rust
use common_metrics::network::record_ws_request_duration;
fn handle_websocket_request() {
let receive_ms = now_mills();
// 处理 WebSocket 请求...
let response_ms = now_mills();
// 自动记录 total_ms、handler_ms 和 response_ms
record_ws_request_duration(receive_ms, response_ms);
}线程数监控示例
rust
use common_metrics::network::record_broker_thread_num;
use metadata_struct::connection::NetworkConnectionType;
fn monitor_thread_pools(network_type: NetworkConnectionType) {
let accept_threads = 4;
let handler_threads = 16;
let response_threads = 8;
// 记录各类型线程数
record_broker_thread_num(
&network_type,
(accept_threads, handler_threads, response_threads)
);
}最佳实践
1. 指标记录时机
- 请求开始时:记录接收时间戳
- 进入队列时:记录队列等待开始时间
- 处理完成时:记录处理时间
- 响应发送前:记录响应处理时间
- 请求结束时:记录总时间
2. 性能优化建议
- 使用便捷函数(如
record_response_and_total_ms)减少重复代码 - 在关键路径上记录指标,避免遗漏
- 合理设置 Histogram buckets 以准确反映延迟分布
- 定期检查队列大小指标,及时发现积压问题
3. 监控指标组合
延迟分析组合:
request_total_ms- 端到端总延迟request_queue_ms+request_handler_ms+request_response_ms- 分段延迟- 通过对比找出瓶颈环节
容量规划组合:
network_queue_num- 队列积压情况broker_active_thread_num- 线程资源使用- 结合分析确定扩容策略
性能诊断组合:
- P50、P95、P99 分位数 - 了解延迟分布
- 按
network标签分组 - 识别特定网络类型问题 - 按
thread_type分组 - 定位线程池瓶颈
相关文件
- 指标定义代码:
src/common/metrics/src/network.rs - Grafana 仪表板:
grafana/robustmq-mqtt-broker-dashboard.json - Prometheus 配置:
grafana/prometheus-config-example.yml - 告警规则:
grafana/robustmq-alerts.yml
通过这些基础设施指标,运维团队可以全面了解 RobustMQ 系统的运行状态,及时发现和解决性能问题,确保系统的稳定运行。
