MQTT Broker 管理 HTTP API
本文档介绍 MQTT Broker 相关的所有 HTTP API 接口。通用信息请参考 COMMON.md。
API 接口列表
1. 集群概览
1.1 集群概览信息
- 接口:
POST /mqtt/overview
- 描述: 获取 MQTT 集群概览信息
- 请求参数: 空 JSON 对象
json
{}
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"node_list": [
{
"node_id": 1,
"node_ip": "192.168.1.100",
"node_inner_addr": "192.168.1.100:9981",
"extend_info": "{}",
"create_time": 1640995200
}
],
"cluster_name": "robustmq-cluster",
"message_in_rate": 100,
"message_out_rate": 85,
"connection_num": 1500,
"session_num": 1200,
"topic_num": 50,
"placement_status": "Leader",
"tcp_connection_num": 800,
"tls_connection_num": 400,
"websocket_connection_num": 200,
"quic_connection_num": 100,
"subscribe_num": 2000,
"exclusive_subscribe_num": 1500,
"share_subscribe_leader_num": 300,
"share_subscribe_resub_num": 200,
"exclusive_subscribe_thread_num": 8,
"share_subscribe_leader_thread_num": 4,
"share_subscribe_follower_thread_num": 4
}
}
1.2 集群监控指标
- 接口:
POST /mqtt/overview/metrics
- 描述: 获取指定时间范围内的集群监控指标
- 请求参数:
json
{
"start_time": 1640995200, // Unix 时间戳,开始时间
"end_time": 1640998800 // Unix 时间戳,结束时间
}
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"connection_num": "[{\"timestamp\":1640995200,\"value\":1500}]",
"topic_num": "[{\"timestamp\":1640995200,\"value\":50}]",
"subscribe_num": "[{\"timestamp\":1640995200,\"value\":2000}]",
"message_in_num": "[{\"timestamp\":1640995200,\"value\":10000}]",
"message_out_num": "[{\"timestamp\":1640995200,\"value\":8500}]",
"message_drop_num": "[{\"timestamp\":1640995200,\"value\":15}]"
}
}
2. 客户端管理
2.1 客户端列表查询
- 接口:
POST /mqtt/client/list
- 描述: 查询连接到集群的客户端列表
- 请求参数:
json
{
"source_ip": "192.168.1.1", // 可选,按源IP过滤
"connection_id": 12345, // 可选,按连接ID过滤
"limit": 20, // 可选,每页大小
"page": 1, // 可选,页码
"sort_field": "connection_id", // 可选,排序字段
"sort_by": "desc", // 可选,排序方式
"filter_field": "protocol", // 可选,过滤字段
"filter_values": ["MQTT"], // 可选,过滤值
"exact_match": "true" // 可选,精确匹配
}
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"connection_id": 12345,
"connection_type": "TCP",
"protocol": "MQTT",
"source_addr": "192.168.1.100:52341",
"create_time": "2024-01-01 10:00:00"
}
],
"total_count": 100
}
}
3. 会话管理
3.1 会话列表查询
- 接口:
POST /mqtt/session/list
- 描述: 查询 MQTT 会话列表
- 请求参数:
json
{
"client_id": "client001", // 可选,按客户端ID过滤
"limit": 20,
"page": 1,
"sort_field": "create_time", // 可选,排序字段
"sort_by": "desc",
"filter_field": "client_id",
"filter_values": ["client001"],
"exact_match": "false"
}
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"client_id": "client001",
"session_expiry": 3600,
"is_contain_last_will": true,
"last_will_delay_interval": 30,
"create_time": 1640995200,
"connection_id": 12345,
"broker_id": 1,
"reconnect_time": 1640995300,
"distinct_time": 1640995400
}
],
"total_count": 50
}
}
4. 主题管理
4.1 主题列表查询
- 接口:
POST /mqtt/topic/list
- 描述: 查询 MQTT 主题列表
- 请求参数:
json
{
"topic_name": "sensor/+", // 可选,按主题名过滤
"limit": 20,
"page": 1,
"sort_field": "topic_name", // 可选,排序字段
"sort_by": "asc",
"filter_field": "topic_name",
"filter_values": ["sensor"],
"exact_match": "false"
}
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"topic_id": "topic_001",
"topic_name": "sensor/temperature",
"is_contain_retain_message": true
}
],
"total_count": 25
}
}
4.2 主题重写规则列表
- 接口:
POST /mqtt/topic-rewrite/list
- 描述: 查询主题重写规则列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"source_topic": "old/topic/+",
"dest_topic": "new/topic/$1",
"regex": "^old/topic/(.+)$",
"action": "All"
}
],
"total_count": 10
}
}
4.3 创建主题重写规则
- 接口:
POST /mqtt/topic-rewrite/create
- 描述: 创建新的主题重写规则
- 请求参数:
json
{
"action": "All", // 动作类型:All, Publish, Subscribe
"source_topic": "old/topic/+", // 源主题模式
"dest_topic": "new/topic/$1", // 目标主题模式
"regex": "^old/topic/(.+)$" // 正则表达式
}
- 响应: 成功返回 "success"
4.4 删除主题重写规则
- 接口:
POST /mqtt/topic-rewrite/delete
- 描述: 删除主题重写规则
- 请求参数:
json
{
"action": "All",
"source_topic": "old/topic/+"
}
- 响应: 成功返回 "success"
5. 订阅管理
5.1 订阅列表查询
- 接口:
POST /mqtt/subscribe/list
- 描述: 查询订阅列表
- 请求参数:
json
{
"client_id": "client001", // 可选,按客户端ID过滤
"limit": 20,
"page": 1,
"sort_field": "create_time",
"sort_by": "desc",
"filter_field": "client_id",
"filter_values": ["client001"],
"exact_match": "false"
}
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"client_id": "client001",
"path": "sensor/+",
"broker_id": 1,
"protocol": "MQTTv5",
"qos": "QoS1",
"no_local": 0,
"preserve_retain": 0,
"retain_handling": "SendAtSubscribe",
"create_time": "2024-01-01 10:00:00",
"pk_id": 1,
"properties": "{}",
"is_share_sub": false
}
],
"total_count": 30
}
}
5.2 订阅详情查询
- 接口:
POST /mqtt/subscribe/detail
- 描述: 查询订阅详情
- 请求参数: 空 JSON 对象
- 响应: 当前返回空字符串(功能待实现)
5.3 自动订阅规则管理
5.3.1 自动订阅列表
- 接口:
POST /mqtt/auto-subscribe/list
- 描述: 查询自动订阅规则列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"topic": "system/+",
"qos": "QoS1",
"no_local": false,
"retain_as_published": false,
"retained_handling": "SendAtSubscribe"
}
],
"total_count": 5
}
}
5.3.2 创建自动订阅规则
- 接口:
POST /mqtt/auto-subscribe/create
- 描述: 创建新的自动订阅规则
- 请求参数:
json
{
"topic": "system/+", // 主题模式
"qos": 1, // QoS 级别:0, 1, 2
"no_local": false, // 是否本地
"retain_as_published": false, // 保持发布状态
"retained_handling": 0 // 保留消息处理方式:0, 1, 2
}
- 响应: 成功返回 "success"
5.3.3 删除自动订阅规则
- 接口:
POST /mqtt/auto-subscribe/delete
- 描述: 删除自动订阅规则
- 请求参数:
json
{
"topic_name": "system/+"
}
- 响应: 成功返回 "success"
5.4 慢订阅监控
5.4.1 慢订阅列表
- 接口:
POST /mqtt/slow-subscribe/list
- 描述: 查询慢订阅列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"client_id": "slow_client",
"topic_name": "heavy/topic",
"time_span": 5000,
"node_info": "node1",
"create_time": "2024-01-01 10:00:00",
"subscribe_name": "sub001"
}
],
"total_count": 3
}
}
6. 用户管理
6.1 用户列表查询
- 接口:
POST /mqtt/user/list
- 描述: 查询 MQTT 用户列表
- 请求参数:
json
{
"user_name": "admin", // 可选,按用户名过滤
"limit": 20,
"page": 1,
"sort_field": "username", // 可选,排序字段
"sort_by": "asc",
"filter_field": "username",
"filter_values": ["admin"],
"exact_match": "false"
}
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"username": "admin",
"is_superuser": true
}
],
"total_count": 10
}
}
6.2 创建用户
- 接口:
POST /mqtt/user/create
- 描述: 创建新的 MQTT 用户
- 请求参数:
json
{
"username": "newuser", // 用户名
"password": "password123", // 密码
"is_superuser": false // 是否为超级用户
}
- 响应: 成功返回 "Created successfully!"
6.3 删除用户
- 接口:
POST /mqtt/user/delete
- 描述: 删除 MQTT 用户
- 请求参数:
json
{
"username": "olduser"
}
- 响应: 成功返回 "Deleted successfully!"
7. ACL 管理
7.1 ACL 列表查询
- 接口:
POST /mqtt/acl/list
- 描述: 查询访问控制列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"resource_type": "ClientId",
"resource_name": "client001",
"topic": "sensor/+",
"ip": "192.168.1.0/24",
"action": "Publish",
"permission": "Allow"
}
],
"total_count": 15
}
}
7.2 创建 ACL 规则
- 接口:
POST /mqtt/acl/create
- 描述: 创建新的 ACL 规则
- 请求参数:
json
{
"resource_type": "ClientId", // 资源类型:ClientId, Username, IpAddress
"resource_name": "client001", // 资源名称
"topic": "sensor/+", // 主题模式
"ip": "192.168.1.100", // IP地址
"action": "Publish", // 动作:Publish, Subscribe, All
"permission": "Allow" // 权限:Allow, Deny
}
- 响应: 成功返回 "Created successfully!"
7.3 删除 ACL 规则
- 接口:
POST /mqtt/acl/delete
- 描述: 删除 ACL 规则
- 请求参数:
json
{
"resource_type": "ClientId",
"resource_name": "client001",
"topic": "sensor/+",
"ip": "192.168.1.100",
"action": "Publish",
"permission": "Allow"
}
- 响应: 成功返回 "Deleted successfully!"
8. 黑名单管理
8.1 黑名单列表查询
- 接口:
POST /mqtt/blacklist/list
- 描述: 查询黑名单列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"blacklist_type": "ClientId",
"resource_name": "malicious_client",
"end_time": "2024-12-31 23:59:59",
"desc": "Blocked due to suspicious activity"
}
],
"total_count": 5
}
}
8.2 创建黑名单
- 接口:
POST /mqtt/blacklist/create
- 描述: 添加新的黑名单项
- 请求参数:
json
{
"blacklist_type": "ClientId", // 黑名单类型:ClientId, IpAddress, Username
"resource_name": "bad_client", // 资源名称
"end_time": 1735689599, // 结束时间(Unix时间戳)
"desc": "Blocked for security" // 描述
}
- 响应: 成功返回 "Created successfully!"
8.3 删除黑名单
- 接口:
POST /mqtt/blacklist/delete
- 描述: 删除黑名单项
- 请求参数:
json
{
"blacklist_type": "ClientId",
"resource_name": "bad_client"
}
- 响应: 成功返回 "Deleted successfully!"
9. 连接器管理
9.1 连接器列表查询
- 接口:
POST /mqtt/connector/list
- 描述: 查询连接器列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"connector_name": "kafka_connector",
"connector_type": "Kafka",
"config": "{\"bootstrap_servers\":\"localhost:9092\"}",
"topic_id": "topic_001",
"status": "Running",
"broker_id": "1",
"create_time": "2024-01-01 10:00:00",
"update_time": "2024-01-01 11:00:00"
}
],
"total_count": 8
}
}
9.2 创建连接器
- 接口:
POST /mqtt/connector/create
- 描述: 创建新的连接器
- 请求参数:
json
{
"connector_name": "new_connector", // 连接器名称
"connector_type": "Kafka", // 连接器类型
"config": "{\"bootstrap_servers\":\"localhost:9092\",\"topic\":\"mqtt_messages\"}", // 配置信息(JSON字符串)
"topic_id": "sensor/+" // 关联的主题ID
}
连接器类型和配置示例:
Kafka 连接器:
json
{
"connector_type": "Kafka",
"config": "{\"bootstrap_servers\":\"localhost:9092\",\"topic\":\"mqtt_messages\",\"acks\":\"all\"}"
}
文件连接器:
json
{
"connector_type": "File",
"config": "{\"path\":\"/tmp/mqtt_messages.log\",\"max_file_size\":\"100MB\"}"
}
- 响应: 成功返回 "Created successfully!"
9.3 删除连接器
- 接口:
POST /mqtt/connector/delete
- 描述: 删除连接器
- 请求参数:
json
{
"connector_name": "old_connector"
}
- 响应: 成功返回 "Deleted successfully!"
10. Schema 管理
10.1 Schema 列表查询
- 接口:
POST /mqtt/schema/list
- 描述: 查询 Schema 列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"name": "temperature_schema",
"schema_type": "json",
"desc": "Temperature sensor data schema",
"schema": "{\"type\":\"object\",\"properties\":{\"temp\":{\"type\":\"number\"},\"unit\":{\"type\":\"string\"}}}"
}
],
"total_count": 12
}
}
10.2 创建 Schema
- 接口:
POST /mqtt/schema/create
- 描述: 创建新的 Schema
- 请求参数:
json
{
"schema_name": "sensor_data_schema", // Schema名称
"schema_type": "json", // Schema类型:json, avro, protobuf
"schema": "{\"type\":\"object\",\"properties\":{\"temperature\":{\"type\":\"number\"},\"humidity\":{\"type\":\"number\"}}}", // Schema定义
"desc": "Sensor data validation schema" // 描述
}
Schema 类型示例:
JSON Schema:
json
{
"schema_type": "json",
"schema": "{\"type\":\"object\",\"properties\":{\"temperature\":{\"type\":\"number\",\"minimum\":-50,\"maximum\":100}}}"
}
AVRO Schema:
json
{
"schema_type": "avro",
"schema": "{\"type\":\"record\",\"name\":\"SensorData\",\"fields\":[{\"name\":\"temperature\",\"type\":\"double\"}]}"
}
- 响应: 成功返回 "Created successfully!"
10.3 删除 Schema
- 接口:
POST /mqtt/schema/delete
- 描述: 删除 Schema
- 请求参数:
json
{
"schema_name": "old_schema"
}
- 响应: 成功返回 "Deleted successfully!"
10.4 Schema 绑定管理
10.4.1 Schema 绑定列表查询
- 接口:
POST /mqtt/schema-bind/list
- 描述: 查询 Schema 绑定关系列表
- 请求参数:
json
{
"resource_name": "sensor/temperature", // 可选,资源名称过滤
"schema_name": "temp_schema", // 可选,Schema名称过滤
"limit": 20,
"page": 1,
"sort_field": "data_type",
"sort_by": "asc",
"filter_field": "data_type",
"filter_values": ["resource"],
"exact_match": "false"
}
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"data_type": "resource",
"data": ["sensor_data_schema", "device_status_schema"]
}
],
"total_count": 2
}
}
10.4.2 创建 Schema 绑定
- 接口:
POST /mqtt/schema-bind/create
- 描述: 创建 Schema 与资源的绑定关系
- 请求参数:
json
{
"schema_name": "sensor_data_schema", // Schema名称
"resource_name": "sensor/temperature" // 资源名称(通常是主题名)
}
- 响应: 成功返回 "Created successfully!"
10.4.3 删除 Schema 绑定
- 接口:
POST /mqtt/schema-bind/delete
- 描述: 删除 Schema 绑定关系
- 请求参数:
json
{
"schema_name": "sensor_data_schema",
"resource_name": "sensor/temperature"
}
- 响应: 成功返回 "Deleted successfully!"
11. 系统监控
11.1 系统告警列表
- 接口:
POST /mqtt/system-alarm/list
- 描述: 查询系统告警列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"name": "High Memory Usage",
"message": "Memory usage exceeded 80% threshold",
"activate_at": "2024-01-01 10:00:00",
"activated": true
}
],
"total_count": 3
}
}
11.2 连接抖动检测列表
- 接口:
POST /mqtt/flapping_detect/list
- 描述: 查询连接抖动检测列表
- 请求参数: 支持通用分页和过滤参数
- 响应数据结构:
json
{
"code": 0,
"message": "success",
"data": {
"data": [
{
"client_id": "flapping_client",
"before_last_windows_connections": 15,
"first_request_time": 1640995200
}
],
"total_count": 2
}
}
枚举值说明
ACL 资源类型 (resource_type)
ClientId
: 客户端IDUsername
: 用户名IpAddress
: IP地址
ACL 动作 (action)
Publish
: 发布消息Subscribe
: 订阅主题All
: 所有动作
ACL 权限 (permission)
Allow
: 允许Deny
: 拒绝
黑名单类型 (blacklist_type)
ClientId
: 客户端IDIpAddress
: IP地址Username
: 用户名
连接器类型 (connector_type)
Kafka
: Kafka消息队列File
: 本地文件
Schema 类型 (schema_type)
json
: JSON Schemaavro
: Apache Avroprotobuf
: Protocol Buffers
QoS 级别
0
: 最多一次传递1
: 至少一次传递2
: 恰好一次传递
保留消息处理方式 (retained_handling)
0
: 发送保留消息1
: 仅在新订阅时发送保留消息2
: 不发送保留消息
使用示例
查询集群概览
bash
curl -X POST http://localhost:8080/mqtt/overview \
-H "Content-Type: application/json" \
-d '{}'
查询客户端列表
bash
curl -X POST http://localhost:8080/mqtt/client/list \
-H "Content-Type: application/json" \
-d '{
"limit": 10,
"page": 1,
"sort_field": "connection_id",
"sort_by": "desc"
}'
创建用户
bash
curl -X POST http://localhost:8080/mqtt/user/create \
-H "Content-Type: application/json" \
-d '{
"username": "testuser",
"password": "testpass123",
"is_superuser": false
}'
创建ACL规则
bash
curl -X POST http://localhost:8080/mqtt/acl/create \
-H "Content-Type: application/json" \
-d '{
"resource_type": "ClientId",
"resource_name": "sensor001",
"topic": "sensor/+",
"ip": "192.168.1.100",
"action": "Publish",
"permission": "Allow"
}'
创建连接器
bash
curl -X POST http://localhost:8080/mqtt/connector/create \
-H "Content-Type: application/json" \
-d '{
"connector_name": "kafka_bridge",
"connector_type": "Kafka",
"config": "{\"bootstrap_servers\":\"localhost:9092\",\"topic\":\"mqtt_messages\"}",
"topic_id": "sensor/+"
}'
创建Schema
bash
curl -X POST http://localhost:8080/mqtt/schema/create \
-H "Content-Type: application/json" \
-d '{
"schema_name": "sensor_schema",
"schema_type": "json",
"schema": "{\"type\":\"object\",\"properties\":{\"temperature\":{\"type\":\"number\"},\"humidity\":{\"type\":\"number\"}}}",
"desc": "Sensor data validation schema"
}'
文档版本: v3.0
最后更新: 2024-01-01
基于代码版本: RobustMQ Admin Server v0.1.31