Skip to content

集群管理 HTTP API

本文档介绍集群配置和管理相关的 HTTP API 接口。通用信息请参考 COMMON.md

通用响应格式

所有接口均返回统一的 JSON 响应结构:

成功响应:

json
{
  "code": 0,
  "data": "...",
  "error": null
}

失败响应:

json
{
  "code": 100,
  "data": "",
  "error": "错误信息"
}

集群配置管理

1. 获取集群配置

  • 接口: GET /api/cluster/config/get

  • 描述: 获取当前集群的完整 BrokerConfig 配置信息

  • 请求参数: 无

  • 响应示例:

json
{
  "code": 0,
  "data": {
    "cluster_name": "robust_mq_cluster_default",
    "broker_id": 1,
    "broker_ip": "192.168.1.100",
    "roles": ["broker", "meta"],
    "grpc_port": 1228,
    "http_port": 8080,
    "meta_addrs": {
      "1": "127.0.0.1:1228"
    },
    "prometheus": {
      "enable": true,
      "port": 9090
    },
    "log": {
      "log_path": "./logs",
      "log_config": "./config/broker-tracing.toml"
    },
    "runtime": {
      "runtime_worker_threads": 1,
      "server_worker_threads": 0,
      "meta_worker_threads": 0,
      "broker_worker_threads": 0,
      "channels_per_address": 10,
      "tls_cert": "./config/certs/cert.pem",
      "tls_key": "./config/certs/key.pem"
    },
    "network": {
      "accept_thread_num": 8,
      "handler_thread_num": 32,
      "queue_size": 1000
    },
    "pprof": {
      "enable": false,
      "port": 6060,
      "frequency": 100
    },
    "rocksdb": {
      "data_path": "./data",
      "max_open_files": 10000
    },
    "llm_client": null,
    "cluster_limit": {
      "max_network_connection": 100000000,
      "max_network_connection_rate": 10000,
      "max_admin_http_uri_rate": 50
    },
    "meta_runtime": {
      "heartbeat_timeout_ms": 30000,
      "heartbeat_check_time_ms": 1000,
      "raft_write_timeout_sec": 30,
      "offset_raft_group_num": 1,
      "data_raft_group_num": 1
    },
    "storage_runtime": {
      "tcp_port": 1778,
      "max_segment_size": 1073741824,
      "io_thread_num": 8,
      "data_path": [],
      "offset_enable_cache": true
    },
    "mqtt_server": {
      "tcp_port": 1883,
      "tls_port": 1885,
      "websocket_port": 8083,
      "websockets_port": 8085,
      "quic_port": 9083
    },
    "mqtt_keep_alive": {
      "enable": true,
      "default_time": 180,
      "max_time": 3600,
      "default_timeout": 2
    },
    "mqtt_runtime": {
      "default_user": "admin",
      "default_password": "robustmq",
      "durable_sessions_enable": false,
      "secret_free_login": false,
      "is_self_protection_status": false
    },
    "mqtt_offline_message": {
      "enable": true,
      "expire_ms": 0,
      "max_messages_num": 0
    },
    "mqtt_slow_subscribe": {
      "enable": false,
      "record_time": 1000,
      "delay_type": "Whole"
    },
    "mqtt_flapping_detect": {
      "enable": false,
      "window_time": 1,
      "max_client_connections": 15,
      "ban_time": 5
    },
    "mqtt_protocol": {
      "max_session_expiry_interval": 1800,
      "default_session_expiry_interval": 30,
      "topic_alias_max": 65535,
      "max_packet_size": 10485760,
      "receive_max": 65535,
      "max_message_expiry_interval": 3600,
      "client_pkid_persistent": false
    },
    "mqtt_schema": {
      "enable": true,
      "strategy": "ALL",
      "failed_operation": "Discard",
      "echo_log": true,
      "log_level": "info"
    },
    "mqtt_system_monitor": {
      "enable": false,
      "os_cpu_high_watermark": 70.0,
      "os_memory_high_watermark": 80.0,
      "system_topic_interval_ms": 60000
    },
    "mqtt_limit": {
      "cluster": {
        "max_connections_per_node": 10000000,
        "max_connection_rate": 100000,
        "max_topics": 5000000,
        "max_sessions": 50000000,
        "max_publish_rate": 10000
      },
      "tenant": {
        "max_connections_per_node": 1000000,
        "max_connection_rate": 10000,
        "max_topics": 500000,
        "max_sessions": 5000000,
        "max_publish_rate": 10000
      }
    }
  },
  "error": null
}

2. 设置集群配置

  • 接口: POST /api/cluster/config/set
  • 描述: 动态更新集群配置,修改立即生效并持久化到 Meta 存储
  • 请求参数:
字段类型必填说明
config_typestring配置类型,见下方可选值
configstring对应类型的配置 JSON 字符串

config_type 可选值及对应 config 字段结构:


MqttSlowSubscribeConfig — 慢订阅检测

字段类型默认值说明
enableboolfalse是否启用
record_timeu641000慢订阅阈值(ms),超过该时间的订阅推送将被记录
delay_typestring"CreateTime"延迟统计方式:"CreateTime""PublishTime"
json
{
  "config_type": "MqttSlowSubscribeConfig",
  "config": "{\"enable\":true,\"record_time\":500,\"delay_type\":\"CreateTime\"}"
}

MqttFlappingDetect — 连接抖动检测

字段类型默认值说明
enableboolfalse是否启用
window_timeu3260检测窗口时间(秒)
max_client_connectionsu645窗口内最大允许连接次数,超过则触发封禁
ban_timeu32300封禁时长(秒)
json
{
  "config_type": "MqttFlappingDetect",
  "config": "{\"enable\":true,\"window_time\":60,\"max_client_connections\":5,\"ban_time\":300}"
}

MqttProtocol — MQTT 协议参数

字段类型默认值说明
max_session_expiry_intervalu322592000最大 Session 过期时间(秒)
default_session_expiry_intervalu323600默认 Session 过期时间(秒)
topic_alias_maxu1665535最大 Topic Alias 数量
max_packet_sizeu3210485760最大报文大小(字节,默认 10MB)
receive_maxu1665535接收窗口大小
max_message_expiry_intervalu6486400消息最大过期时间(秒)
client_pkid_persistentboolfalse是否持久化客户端 Packet ID
json
{
  "config_type": "MqttProtocol",
  "config": "{\"max_session_expiry_interval\":2592000,\"default_session_expiry_interval\":3600,\"topic_alias_max\":65535,\"max_packet_size\":10485760,\"receive_max\":65535,\"max_message_expiry_interval\":86400,\"client_pkid_persistent\":false}"
}

MqttOfflineMessage — 离线消息

字段类型默认值说明
enableboolfalse是否启用
expire_msu3286400000离线消息过期时间(ms)
max_messages_numu321000每个客户端最多保存的离线消息数
json
{
  "config_type": "MqttOfflineMessage",
  "config": "{\"enable\":true,\"expire_ms\":86400000,\"max_messages_num\":1000}"
}

MqttSystemMonitor — 系统监控

字段类型默认值说明
enableboolfalse是否启用系统监控
os_cpu_high_watermarkf320.7CPU 使用率高水位(0~1)
os_memory_high_watermarkf320.8内存使用率高水位(0~1)
system_topic_interval_msu6460000系统 Topic 上报间隔(ms)
json
{
  "config_type": "MqttSystemMonitor",
  "config": "{\"enable\":true,\"os_cpu_high_watermark\":0.7,\"os_memory_high_watermark\":0.8,\"system_topic_interval_ms\":60000}"
}

MqttSchema — Schema 校验

字段类型默认值说明
enableboolfalse是否启用 Schema 校验
strategystring"Forward"校验策略:"Forward"(透传)或 "Strict"
failed_operationstring"Disconnect"校验失败操作:"Disconnect""Ignore"
echo_logboolfalse是否打印 Schema 校验日志
log_levelstring"info"日志级别
json
{
  "config_type": "MqttSchema",
  "config": "{\"enable\":true,\"strategy\":\"Strict\",\"failed_operation\":\"Disconnect\",\"echo_log\":false,\"log_level\":\"info\"}"
}

MqttLimit — MQTT 流量限制

字段类型说明
clusterobject集群级别限制,见 LimitQuota
tenantobject租户默认限制,见 LimitQuota

LimitQuota 字段

字段类型说明
max_connections_per_nodeu64每节点最大连接数
max_connection_rateu32最大连接速率(连接/秒)
max_topicsu64最大 Topic 数
max_sessionsu64最大 Session 数
max_publish_rateu32最大发布速率(消息/秒)
json
{
  "config_type": "MqttLimit",
  "config": "{\"cluster\":{\"max_connections_per_node\":10000000,\"max_connection_rate\":100000,\"max_topics\":5000000,\"max_sessions\":50000000,\"max_publish_rate\":10000},\"tenant\":{\"max_connections_per_node\":1000000,\"max_connection_rate\":10000,\"max_topics\":500000,\"max_sessions\":5000000,\"max_publish_rate\":10000}}"
}

ClusterLimit — 集群接入限制

字段类型默认值说明
max_network_connectionu64100000000最大网络连接总数
max_network_connection_rateu3210000最大网络连接速率(连接/秒)
max_admin_http_uri_rateu3250Admin HTTP 接口最大请求速率(次/秒)
json
{
  "config_type": "ClusterLimit",
  "config": "{\"max_network_connection\":100000000,\"max_network_connection_rate\":10000,\"max_admin_http_uri_rate\":50}"
}

  • 响应示例:
json
{
  "code": 0,
  "data": "success",
  "error": null
}

集群信息

3. 获取集群状态

  • 接口: GET /api/cluster/status

  • 描述: 获取集群运行状态,包括版本、节点列表、各 Raft Group 的 Meta 状态等

  • 响应示例:

json
{
  "code": 0,
  "data": {
    "version": "0.3.0",
    "cluster_name": "robust_mq_cluster_default",
    "start_time": 1738800000,
    "broker_node_list": [],
    "nodes": ["127.0.0.1"],
    "meta": {
      "mqtt": {
        "running_state": { "Ok": null },
        "id": 1,
        "current_term": 1,
        "vote": { "leader_id": { "term": 1, "node_id": 1 }, "committed": true },
        "last_log_index": 30001,
        "last_applied": { "leader_id": { "term": 1, "node_id": 1 }, "index": 30001 },
        "snapshot": { "leader_id": { "term": 1, "node_id": 1 }, "index": 30001 },
        "purged": { "leader_id": { "term": 1, "node_id": 1 }, "index": 30001 },
        "state": "Leader",
        "current_leader": 1,
        "millis_since_quorum_ack": 0,
        "membership_config": {
          "log_id": { "leader_id": { "term": 0, "node_id": 0 }, "index": 0 },
          "membership": {
            "configs": [[1]],
            "nodes": { "1": { "node_id": 1, "rpc_addr": "127.0.0.1:1228" } }
          }
        },
        "replication": { "1": { "leader_id": { "term": 1, "node_id": 1 }, "index": 30001 } }
      },
      "offset": {
        "state": "Leader",
        "current_leader": 1,
        "last_log_index": 1,
        "..."  : "..."
      },
      "meta": {
        "state": "Leader",
        "current_leader": 1,
        "last_log_index": 42853,
        "..." : "..."
      }
    }
  },
  "error": null
}

data 字段说明

字段类型说明
versionstring当前 Broker 版本号
cluster_namestring集群名称
start_timeu64进程启动时间戳(Unix 秒)
broker_node_listarray集群中所有 Broker 节点信息列表
nodesstring[]集群节点 IP 列表(去重后)
metaobject各 Raft Group 的运行状态,key 为 group 名称

meta 的 key 说明

key说明
mqttMQTT 数据 Raft Group 状态
offsetOffset 数据 Raft Group 状态
meta元数据 Raft Group 状态

每个 Raft Group 状态(MetaStatus)字段说明

字段类型说明
running_stateobject运行状态,{"Ok": null} 表示正常
idu64当前节点 ID
current_termu64当前 Raft term
voteobject当前投票信息
last_log_indexu64最新日志索引
last_appliedobject已应用到状态机的最新日志位置
snapshotobject/null最新快照位置
purgedobject/null已清理的最旧日志位置
statestring节点角色:LeaderFollowerCandidate
current_leaderu64当前 Leader 节点 ID
millis_since_quorum_acku64距上次 quorum 确认的毫秒数
membership_configobject集群成员配置信息
replicationobject各节点的复制进度,key 为节点 ID

返回值字段说明

BrokerConfig 各部分说明

基础配置

字段类型说明
cluster_namestring集群名称
broker_idu64Broker 节点 ID
broker_ipstring/nullBroker IP 地址
rolesstring[]节点角色,可选 metabrokerengine
grpc_portu32gRPC 服务端口
http_portu32HTTP API 服务端口
meta_addrsobjectMeta 节点地址映射(节点ID → 地址)

prometheus

字段类型说明
enablebool是否启用 Prometheus 指标
portu16Prometheus 指标暴露端口

log

字段类型说明
log_pathstring日志输出目录
log_configstring日志 tracing 配置文件路径

runtime

字段类型说明
runtime_worker_threadsusize兼容旧版全局线程倍数,各运行时字段为 0 时作为回退值
server_worker_threadsusizeServer 运行时线程数(0 = 自动,等于 CPU 核心数)
meta_worker_threadsusizeMeta 运行时线程数(0 = 自动)
broker_worker_threadsusizeBroker 运行时线程数(0 = 自动,热路径运行时)
channels_per_addressusize每个地址的 gRPC 连接通道数
tls_certstringTLS 证书文件路径
tls_keystringTLS 私钥文件路径

network

字段类型说明
accept_thread_numusize连接接受线程数
handler_thread_numusize消息处理线程数
queue_sizeusize内部队列大小

pprof

字段类型说明
enablebool是否启用 pprof 性能分析
portu16pprof HTTP 端口
frequencyi32采样频率

meta_runtime

字段类型说明
heartbeat_timeout_msu64心跳超时时间(毫秒)
heartbeat_check_time_msu64心跳检查间隔(毫秒)
raft_write_timeout_secu64Raft 写入超时(秒)
offset_raft_group_numu32Offset Raft 分片组数量(默认 1)
data_raft_group_numu32数据 Raft 分片组数量(默认 1)

rocksdb

字段类型说明
data_pathstringRocksDB 数据目录
max_open_filesi32最大打开文件数

storage_runtime

字段类型说明
tcp_portu32存储引擎 TCP 端口
max_segment_sizeu32最大段文件大小(字节)
io_thread_numu32IO 线程数
data_pathstring[]数据存储路径列表
offset_enable_cachebool是否启用 Offset 缓存

mqtt_server

字段类型说明
tcp_portu32MQTT TCP 端口
tls_portu32MQTT TLS 端口
websocket_portu32MQTT WebSocket 端口
websockets_portu32MQTT WebSocket Secure 端口
quic_portu32MQTT QUIC 端口

mqtt_keep_alive

字段类型说明
enablebool是否启用 Keep Alive 检测
default_timeu16默认 Keep Alive 时间(秒)
max_timeu16最大 Keep Alive 时间(秒)
default_timeoutu16超时倍数

mqtt_runtime

字段类型说明
default_userstring默认用户名
default_passwordstring默认密码
durable_sessions_enablebool是否启用持久化会话
secret_free_loginbool是否允许免密登录
is_self_protection_statusbool是否处于自我保护状态

mqtt_offline_message

字段类型说明
enablebool是否启用离线消息
expire_msu32过期时间(毫秒),0 表示不过期
max_messages_numu32最大离线消息数量,0 表示不限制

mqtt_slow_subscribe

字段类型说明
enablebool是否启用慢订阅检测
record_timeu64记录阈值时间(毫秒)
delay_typestring延迟类型:WholeInternalResponse

mqtt_flapping_detect

字段类型说明
enablebool是否启用连接抖动检测
window_timeu32时间窗口(分钟)
max_client_connectionsu64窗口内最大连接次数
ban_timeu32封禁时间(分钟)

mqtt_protocol

字段类型说明
max_session_expiry_intervalu32最大会话过期间隔(秒)
default_session_expiry_intervalu32默认会话过期间隔(秒)
topic_alias_maxu16Topic Alias 最大值
max_qos_flight_messageu8QoS 飞行窗口最大消息数
max_packet_sizeu32最大报文大小(字节)
receive_maxu16Receive Maximum
max_message_expiry_intervalu64最大消息过期间隔(秒)
client_pkid_persistentbool客户端 Packet ID 是否持久化

mqtt_schema

字段类型说明
enablebool是否启用 Schema 校验
strategystring校验策略:ALLAny
failed_operationstring校验失败处理:DiscardDisconnectAndDiscardIgnore
echo_logbool是否输出校验日志
log_levelstring日志级别

mqtt_system_monitor

字段类型说明
enablebool是否启用系统监控
os_cpu_high_watermarkf32CPU 高水位线(百分比)
os_memory_high_watermarkf32内存高水位线(百分比)
system_topic_interval_msu64系统 Topic 指标发布间隔(毫秒)

cluster_limit

字段类型说明
max_network_connectionu64集群最大网络连接数
max_network_connection_rateu32集群每秒最大新建连接速率
max_admin_http_uri_rateu32Admin HTTP 接口每秒最大请求速率

mqtt_limit

clustertenant 均为 LimitQuota 结构:

字段类型说明
max_connections_per_nodeu64每节点最大连接数
max_connection_rateu32每秒最大新建连接速率
max_topicsu64最大主题数
max_sessionsu64最大会话数
max_publish_rateu32每秒最大发布速率

llm_client

可选配置,不填时为 null

字段类型说明
platformstringLLM 平台,如 open_aianthropicollama
modelstring模型名称
tokenstring/nullAPI Token(Ollama 不需要)
base_urlstring/null自定义 API 地址(需以 http://https:// 开头)

使用示例

获取集群配置

bash
curl -X GET http://localhost:8080/api/cluster/config/get

获取集群状态

bash
curl -X GET http://localhost:8080/api/cluster/status

设置连接抖动检测配置

bash
curl -X POST http://localhost:8080/api/cluster/config/set \
  -H "Content-Type: application/json" \
  -d '{
    "config_type": "FlappingDetect",
    "config": "{\"enable\":true,\"window_time\":2,\"max_client_connections\":20,\"ban_time\":10}"
  }'


租户管理

租户(Tenant)是 RobustMQ 多租户架构的核心概念,用于对集群资源进行逻辑隔离。适用于同一集群服务多个业务、多个环境(开发/测试/生产)等场景。

4. 获取租户列表

  • 接口: GET /api/tenant/list
  • 描述: 获取集群中所有租户,支持分页、排序、过滤
  • 请求参数(Query String):
字段类型必填说明
tenant_namestring按租户名称模糊查询(包含匹配)
pageu32页码,从 1 开始,默认 1
limitu32每页条数,默认 100
sort_fieldstring排序字段,支持 tenant_name
sort_bystring排序方向:asc / desc
filter_fieldstring过滤字段名
filter_valuesstring[]过滤值列表
exact_matchstring是否精确匹配:exact
  • 响应示例:
json
{
  "code": 0,
  "data": {
    "data": [
      {
        "tenant_name": "business-a",
        "desc": "业务 A 租户",
        "config": {
          "max_connections_per_node": 10000000,
          "max_create_connection_rate_per_second": 10000,
          "max_topics": 5000000,
          "max_sessions": 50000000,
          "max_publish_rate": 10000
        },
        "create_time": 1738800000
      }
    ],
    "total_count": 1
  },
  "error": null
}
  • curl 示例:
bash
# 查询所有租户
curl -X GET "http://localhost:8080/api/tenant/list"

# 模糊查询租户名包含 "business" 的租户
curl -X GET "http://localhost:8080/api/tenant/list?tenant_name=business"

5. 创建租户

  • 接口: POST /api/tenant/create
  • 描述: 创建一个新租户
  • 请求体:
字段类型必填校验说明
tenant_namestring长度 1-128租户名称,全局唯一
descstring长度 ≤ 500租户描述
configobject-租户资源配额配置,不填则使用默认值
config.max_connections_per_nodeu64-每节点最大连接数(默认 10000000)
config.max_create_connection_rate_per_secondu32-每秒最大新建连接速率(默认 10000)
config.max_topicsu64-最大主题数(默认 5000000)
config.max_sessionsu64-最大会话数(默认 50000000)
config.max_publish_rateu32-每秒最大发布消息速率(默认 10000)
  • 请求示例:
json
{
  "tenant_name": "business-a",
  "desc": "业务 A 租户",
  "config": {
    "max_connections_per_node": 50000,
    "max_topics": 100000,
    "max_sessions": 200000,
    "max_publish_rate": 5000
  }
}
  • 响应示例:
json
{
  "code": 0,
  "data": "success",
  "error": null
}
  • curl 示例:
bash
curl -X POST http://localhost:8080/api/tenant/create \
  -H "Content-Type: application/json" \
  -d '{"tenant_name": "business-a", "desc": "业务 A 租户"}'

6. 删除租户

  • 接口: POST /api/tenant/delete
  • 描述: 删除指定租户。删除后,归属该租户的元数据将不再受该租户管控。
  • 请求体:
字段类型必填校验说明
tenant_namestring长度 1-128要删除的租户名称
  • 请求示例:
json
{
  "tenant_name": "business-a"
}
  • 响应示例:
json
{
  "code": 0,
  "data": "success",
  "error": null
}
  • curl 示例:
bash
curl -X POST http://localhost:8080/api/tenant/delete \
  -H "Content-Type: application/json" \
  -d '{"tenant_name": "business-a"}'

7. 更新租户

  • 接口: POST /api/tenant/update
  • 描述: 更新租户的描述和资源配额。租户必须已存在。config 不传时保持原有配置不变。
  • 请求体:
字段类型必填校验说明
tenant_namestring长度 1-128要更新的租户名称
descstring长度 ≤ 500新的租户描述
configobject-租户资源配额配置,不填则保持原有配置不变
config.max_connections_per_nodeu64-每节点最大连接数
config.max_create_connection_rate_per_secondu32-每秒最大新建连接速率
config.max_topicsu64-最大主题数
config.max_sessionsu64-最大会话数
config.max_publish_rateu32-每秒最大发布消息速率
  • 请求示例:
json
{
  "tenant_name": "business-a",
  "desc": "业务 A 租户(已更新)",
  "config": {
    "max_connections_per_node": 100000,
    "max_publish_rate": 20000
  }
}
  • 响应示例:
json
{
  "code": 0,
  "data": "success",
  "error": null
}
  • 错误响应(租户不存在时):
json
{
  "code": 1,
  "message": "Tenant business-a not found",
  "data": null
}
  • curl 示例:
bash
curl -X POST http://localhost:8080/api/tenant/update \
  -H "Content-Type: application/json" \
  -d '{"tenant_name": "business-a", "desc": "业务 A 租户(已更新)", "config": {"max_connections_per_node": 100000}}'

健康检查

8. 集群存活检查

  • 接口: GET /cluster/healthy
  • 描述: 检查服务是否存活,返回 true 表示正常
  • 请求参数: 无
  • 响应示例:
json
{
  "code": 0,
  "message": "success",
  "data": true
}

9. 就绪检查

  • 接口: GET /health/ready
  • 描述: 检查所有配置的端口是否就绪,用于 K8s readiness probe
  • 请求参数: 无
  • 响应:
    • 200 OK — 所有端口就绪:
json
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "ok",
    "check_type": "ready",
    "message": "all configured ports are ready"
  }
}
  • 503 Service Unavailable — 端口未就绪:
json
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "not_ready",
    "check_type": "ready",
    "message": "one or more configured ports are not ready"
  }
}

10. 节点健康检查

  • 接口: GET /health/node
  • 描述: 节点级健康检查(占位实现,始终返回 ok)
  • 请求参数: 无
  • 响应示例:
json
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "ok",
    "check_type": "node",
    "message": "health check placeholder"
  }
}

11. 集群健康检查

  • 接口: GET /health/cluster
  • 描述: 集群级健康检查(占位实现,始终返回 ok)
  • 请求参数: 无
  • 响应示例:
json
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "ok",
    "check_type": "cluster",
    "message": "health check placeholder"
  }
}

12. 主题管理

主题、ACL、黑名单、连接器、Schema、租户均为集群级公共资源,接口统一使用 /cluster 前缀。

12.1 主题列表查询

  • 接口: GET /api/cluster/topic/list
  • 描述: 查询主题列表,支持按租户过滤、主题名模糊搜索和主题类型过滤
  • 请求参数:
字段类型必填说明
tenantstring按租户精确过滤
topic_namestring按主题名模糊搜索
topic_typestringall(默认)、normalsystem(含 $
limitu32每页大小
pageu32页码,从 1 开始
sort_fieldstring排序字段:topic_nametenant
sort_bystringasc / desc
  • 响应示例:
json
{ "code": 0, "data": { "data": [ { "topic_id": "...", "topic_name": "sensor/temperature", "tenant": "default" } ], "total_count": 25 } }

12.2 主题详情查询

  • 接口: GET /api/cluster/topic/detail
  • 描述: 查询指定主题的详细信息(保留消息、订阅列表、存储分片)
  • 请求参数: tenant(必填)、topic_name(必填)

12.3 删除主题

  • 接口: POST /api/cluster/topic/delete
  • 描述: 删除指定主题(软删除,后台异步清理分片数据)
  • 请求参数: { "tenant": "default", "topic_name": "sensor/temperature" }
  • 响应: 成功返回 "success"

12.4 主题重写规则列表

  • 接口: GET /api/cluster/topic-rewrite/list
  • 请求参数: tenantnamelimitpagesort_fieldsort_by

12.5 创建主题重写规则

  • 接口: POST /api/cluster/topic-rewrite/create
  • 请求参数: { "name", "tenant", "action", "source_topic", "dest_topic", "regex", "desc" }

12.6 删除主题重写规则

  • 接口: POST /api/cluster/topic-rewrite/delete
  • 请求参数: { "tenant": "default", "name": "my-rule" }

13. ACL 管理

13.1 ACL 列表查询

  • 接口: GET /api/cluster/acl/list
  • 描述: 查询 ACL 规则列表,支持 tenantnameresource_name 模糊搜索
  • 请求参数: tenantnameresource_namelimitpagesort_fieldsort_by

13.2 创建 ACL 规则

  • 接口: POST /api/cluster/acl/create
  • 请求参数:
json
{
  "tenant": "default",
  "name": "rule-name",
  "resource_type": "ClientId",
  "resource_name": "client001",
  "action": "Publish",
  "permission": "Allow",
  "topic": "sensor/+",
  "ip": "",
  "desc": ""
}
  • resource_type: ClientId | User | Ip
  • action: Publish | Subscribe | All
  • permission: Allow | Deny

13.3 删除 ACL 规则

  • 接口: POST /api/cluster/acl/delete
  • 请求参数: { "tenant": "default", "name": "rule-name" }

14. 黑名单管理

14.1 黑名单列表查询

  • 接口: GET /api/cluster/blacklist/list
  • 请求参数: tenantnameresource_namelimitpagesort_fieldsort_by

14.2 创建黑名单

  • 接口: POST /api/cluster/blacklist/create
  • 请求参数:
json
{
  "name": "bl-bad-client",
  "tenant": "default",
  "blacklist_type": "ClientId",
  "resource_name": "bad_client",
  "end_time": 1735689599,
  "desc": ""
}
  • blacklist_type: ClientId | User | Ip | ClientIdMatch | UserMatch | IPCIDR

14.3 删除黑名单

  • 接口: POST /api/cluster/blacklist/delete
  • 请求参数: { "tenant": "default", "name": "bl-bad-client" }

15. 连接器管理

15.1 连接器列表查询

  • 接口: GET /api/cluster/connector/list
  • 请求参数: tenantconnector_namelimitpagesort_fieldsort_by

15.2 连接器详情

  • 接口: GET /api/cluster/connector/detail
  • 请求参数: tenantconnector_name

15.3 创建连接器

  • 接口: POST /api/cluster/connector/create
  • 请求参数:
json
{
  "connector_name": "kafka-sink",
  "connector_type": "kafka",
  "tenant": "default",
  "topic_name": "sensor/temperature",
  "config": "{ ... }",
  "failure_strategy": { "strategy": "discard" }
}
  • connector_type: kafka | pulsar | rabbitmq | postgres | mysql | mongodb | elasticsearch | redis | webhook | opentsdb | mqtt | clickhouse | influxdb | cassandra | s3 | file | greptime
  • failure_strategy.strategy: discard | discard_after_retry | dead_message_queue

15.4 删除连接器

  • 接口: POST /api/cluster/connector/delete
  • 请求参数: { "tenant": "default", "connector_name": "kafka-sink" }

16. Schema 管理

16.1 Schema 列表查询

  • 接口: GET /api/cluster/schema/list
  • 请求参数: tenantnamelimitpagesort_fieldsort_by

16.2 创建 Schema

  • 接口: POST /api/cluster/schema/create
  • 请求参数:
json
{
  "tenant": "default",
  "schema_name": "sensor-schema",
  "schema_type": "json",
  "schema": "{ \"type\": \"object\" }",
  "desc": ""
}
  • schema_type: json | avro | protobuf

16.3 删除 Schema

  • 接口: POST /api/cluster/schema/delete
  • 请求参数: { "tenant": "default", "schema_name": "sensor-schema" }

16.4 Schema 绑定列表

  • 接口: GET /api/cluster/schema-bind/list
  • 请求参数: tenantresource_nameschema_namelimitpage

16.5 创建 Schema 绑定

  • 接口: POST /api/cluster/schema-bind/create
  • 请求参数: { "tenant": "default", "schema_name": "sensor-schema", "resource_name": "sensor/+" }

16.6 删除 Schema 绑定

  • 接口: POST /api/cluster/schema-bind/delete
  • 请求参数: { "tenant": "default", "schema_name": "sensor-schema", "resource_name": "sensor/+" }

17. 租户管理(集群级)

以下接口为集群级租户管理,替代原 /mqtt/tenant/* 接口。

17.1 租户列表查询

  • 接口: GET /api/cluster/tenant/list
  • 请求参数: tenant_namelimitpagesort_fieldsort_by

17.2 创建租户

  • 接口: POST /api/cluster/tenant/create
  • 请求参数:
json
{
  "tenant_name": "my-tenant",
  "desc": "",
  "max_connections_per_node": 10000,
  "max_create_connection_rate_per_second": 100,
  "max_topics": 10000,
  "max_sessions": 10000,
  "max_publish_rate": 10000
}

17.3 删除租户

  • 接口: POST /api/cluster/tenant/delete
  • 请求参数: { "tenant_name": "my-tenant" }

注意事项

  1. 响应格式: 成功时 code0errornull;失败时 code100error 包含错误信息
  2. 配置格式: config_set 接口的 config 字段必须是有效的 JSON 字符串
  3. 热更新: 部分配置支持热更新,无需重启服务
  4. 备份建议: 修改配置前建议先通过 config/get 接口获取当前配置进行备份
  5. 租户隔离: 租户为逻辑隔离,适用于私有化部署场景;SaaS 多租户场景建议使用物理隔离方案
🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀