Skip to content

Cluster Management HTTP API

This document describes HTTP API interfaces related to cluster configuration and management. For general information, please refer to COMMON.md.

Common Response Format

All interfaces return a unified JSON response structure:

Success Response:

json
{
  "code": 0,
  "data": "...",
  "error": null
}

Error Response:

json
{
  "code": 100,
  "data": "",
  "error": "error message"
}

Cluster Configuration Management

1. Get Cluster Configuration

  • Endpoint: GET /api/cluster/config/get

  • Description: Get the complete BrokerConfig configuration of the current cluster

  • Request Parameters: None

  • Response Example:

json
{
  "code": 0,
  "data": {
    "cluster_name": "robust_mq_cluster_default",
    "broker_id": 1,
    "broker_ip": "192.168.1.100",
    "roles": ["broker", "meta"],
    "grpc_port": 1228,
    "http_port": 8080,
    "meta_addrs": {
      "1": "127.0.0.1:1228"
    },
    "prometheus": {
      "enable": true,
      "port": 9090
    },
    "log": {
      "log_path": "./logs",
      "log_config": "./config/broker-tracing.toml"
    },
    "runtime": {
      "runtime_worker_threads": 1,
      "server_worker_threads": 0,
      "meta_worker_threads": 0,
      "broker_worker_threads": 0,
      "channels_per_address": 10,
      "tls_cert": "./config/certs/cert.pem",
      "tls_key": "./config/certs/key.pem"
    },
    "network": {
      "accept_thread_num": 8,
      "handler_thread_num": 32,
      "queue_size": 1000
    },
    "pprof": {
      "enable": false,
      "port": 6060,
      "frequency": 100
    },
    "rocksdb": {
      "data_path": "./data",
      "max_open_files": 10000
    },
    "llm_client": null,
    "cluster_limit": {
      "max_network_connection": 100000000,
      "max_network_connection_rate": 10000,
      "max_admin_http_uri_rate": 50
    },
    "meta_runtime": {
      "heartbeat_timeout_ms": 30000,
      "heartbeat_check_time_ms": 1000,
      "raft_write_timeout_sec": 30,
      "offset_raft_group_num": 1,
      "data_raft_group_num": 1
    },
    "storage_runtime": {
      "tcp_port": 1778,
      "max_segment_size": 1073741824,
      "io_thread_num": 8,
      "data_path": [],
      "offset_enable_cache": true
    },
    "mqtt_server": {
      "tcp_port": 1883,
      "tls_port": 1885,
      "websocket_port": 8083,
      "websockets_port": 8085,
      "quic_port": 9083
    },
    "mqtt_keep_alive": {
      "enable": true,
      "default_time": 180,
      "max_time": 3600,
      "default_timeout": 2
    },
    "mqtt_runtime": {
      "default_user": "admin",
      "default_password": "robustmq",
      "durable_sessions_enable": false,
      "secret_free_login": false,
      "is_self_protection_status": false
    },
    "mqtt_offline_message": {
      "enable": true,
      "expire_ms": 0,
      "max_messages_num": 0
    },
    "mqtt_slow_subscribe": {
      "enable": false,
      "record_time": 1000,
      "delay_type": "Whole"
    },
    "mqtt_flapping_detect": {
      "enable": false,
      "window_time": 1,
      "max_client_connections": 15,
      "ban_time": 5
    },
    "mqtt_protocol": {
      "max_session_expiry_interval": 1800,
      "default_session_expiry_interval": 30,
      "topic_alias_max": 65535,
      "max_packet_size": 10485760,
      "receive_max": 65535,
      "max_message_expiry_interval": 3600,
      "client_pkid_persistent": false
    },
    "mqtt_schema": {
      "enable": true,
      "strategy": "ALL",
      "failed_operation": "Discard",
      "echo_log": true,
      "log_level": "info"
    },
    "mqtt_system_monitor": {
      "enable": false,
      "os_cpu_high_watermark": 70.0,
      "os_memory_high_watermark": 80.0,
      "system_topic_interval_ms": 60000
    },
    "mqtt_limit": {
      "cluster": {
        "max_connections_per_node": 10000000,
        "max_connection_rate": 100000,
        "max_topics": 5000000,
        "max_sessions": 50000000,
        "max_publish_rate": 10000
      },
      "tenant": {
        "max_connections_per_node": 1000000,
        "max_connection_rate": 10000,
        "max_topics": 500000,
        "max_sessions": 5000000,
        "max_publish_rate": 10000
      }
    }
  },
  "error": null
}

2. Set Cluster Configuration

  • Endpoint: POST /api/cluster/config/set
  • Description: Dynamically update cluster configuration. Changes take effect immediately and are persisted to Meta storage.
  • Request Parameters:
FieldTypeRequiredDescription
config_typestringYesConfiguration type, see options below
configstringYesJSON string of the corresponding configuration type

config_type options and their config field structure:


MqttSlowSubscribeConfig — Slow Subscribe Detection

FieldTypeDefaultDescription
enableboolfalseWhether to enable
record_timeu641000Slow subscribe threshold (ms); subscriptions exceeding this time are recorded
delay_typestring"CreateTime"Delay measurement method: "CreateTime" or "PublishTime"
json
{
  "config_type": "MqttSlowSubscribeConfig",
  "config": "{\"enable\":true,\"record_time\":500,\"delay_type\":\"CreateTime\"}"
}

MqttFlappingDetect — Connection Flapping Detection

FieldTypeDefaultDescription
enableboolfalseWhether to enable
window_timeu3260Detection window duration (seconds)
max_client_connectionsu645Maximum allowed connections in the window; exceeding this triggers a ban
ban_timeu32300Ban duration (seconds)
json
{
  "config_type": "MqttFlappingDetect",
  "config": "{\"enable\":true,\"window_time\":60,\"max_client_connections\":5,\"ban_time\":300}"
}

MqttProtocol — MQTT Protocol Parameters

FieldTypeDefaultDescription
max_session_expiry_intervalu322592000Maximum session expiry interval (seconds)
default_session_expiry_intervalu323600Default session expiry interval (seconds)
topic_alias_maxu1665535Maximum number of topic aliases
max_packet_sizeu3210485760Maximum packet size (bytes, default 10 MB)
receive_maxu1665535Receive window size
max_message_expiry_intervalu6486400Maximum message expiry interval (seconds)
client_pkid_persistentboolfalseWhether to persist client Packet IDs
json
{
  "config_type": "MqttProtocol",
  "config": "{\"max_session_expiry_interval\":2592000,\"default_session_expiry_interval\":3600,\"topic_alias_max\":65535,\"max_packet_size\":10485760,\"receive_max\":65535,\"max_message_expiry_interval\":86400,\"client_pkid_persistent\":false}"
}

MqttOfflineMessage — Offline Messages

FieldTypeDefaultDescription
enableboolfalseWhether to enable
expire_msu3286400000Offline message expiry time (ms)
max_messages_numu321000Maximum offline messages stored per client
json
{
  "config_type": "MqttOfflineMessage",
  "config": "{\"enable\":true,\"expire_ms\":86400000,\"max_messages_num\":1000}"
}

MqttSystemMonitor — System Monitor

FieldTypeDefaultDescription
enableboolfalseWhether to enable system monitoring
os_cpu_high_watermarkf320.7CPU usage high watermark (0~1)
os_memory_high_watermarkf320.8Memory usage high watermark (0~1)
system_topic_interval_msu6460000System topic publish interval (ms)
json
{
  "config_type": "MqttSystemMonitor",
  "config": "{\"enable\":true,\"os_cpu_high_watermark\":0.7,\"os_memory_high_watermark\":0.8,\"system_topic_interval_ms\":60000}"
}

MqttSchema — Schema Validation

FieldTypeDefaultDescription
enableboolfalseWhether to enable schema validation
strategystring"Forward"Validation strategy: "Forward" (pass-through) or "Strict"
failed_operationstring"Disconnect"Action on validation failure: "Disconnect" or "Ignore"
echo_logboolfalseWhether to log schema validation results
log_levelstring"info"Log level
json
{
  "config_type": "MqttSchema",
  "config": "{\"enable\":true,\"strategy\":\"Strict\",\"failed_operation\":\"Disconnect\",\"echo_log\":false,\"log_level\":\"info\"}"
}

MqttLimit — MQTT Rate Limits

FieldTypeDescription
clusterobjectCluster-level limits, see LimitQuota
tenantobjectTenant default limits, see LimitQuota

LimitQuota fields:

FieldTypeDescription
max_connections_per_nodeu64Maximum connections per node
max_connection_rateu32Maximum connection rate (connections/second)
max_topicsu64Maximum number of topics
max_sessionsu64Maximum number of sessions
max_publish_rateu32Maximum publish rate (messages/second)
json
{
  "config_type": "MqttLimit",
  "config": "{\"cluster\":{\"max_connections_per_node\":10000000,\"max_connection_rate\":100000,\"max_topics\":5000000,\"max_sessions\":50000000,\"max_publish_rate\":10000},\"tenant\":{\"max_connections_per_node\":1000000,\"max_connection_rate\":10000,\"max_topics\":500000,\"max_sessions\":5000000,\"max_publish_rate\":10000}}"
}

ClusterLimit — Cluster Access Limits

FieldTypeDefaultDescription
max_network_connectionu64100000000Maximum total network connections
max_network_connection_rateu3210000Maximum network connection rate (connections/second)
max_admin_http_uri_rateu3250Maximum Admin HTTP request rate (requests/second)
json
{
  "config_type": "ClusterLimit",
  "config": "{\"max_network_connection\":100000000,\"max_network_connection_rate\":10000,\"max_admin_http_uri_rate\":50}"
}

  • Response Example:
json
{
  "code": 0,
  "data": "success",
  "error": null
}

Cluster Information

3. Get Cluster Status

  • Endpoint: GET /api/cluster/status

  • Description: Returns cluster runtime status, including version, node list, and Raft group status for each internal group (mqtt, offset, meta).

  • Response Example:

json
{
  "code": 0,
  "data": {
    "version": "0.3.0",
    "cluster_name": "robust_mq_cluster_default",
    "start_time": 1738800000,
    "broker_node_list": [],
    "nodes": ["127.0.0.1"],
    "meta": {
      "mqtt": {
        "running_state": { "Ok": null },
        "id": 1,
        "current_term": 1,
        "vote": { "leader_id": { "term": 1, "node_id": 1 }, "committed": true },
        "last_log_index": 30001,
        "last_applied": { "leader_id": { "term": 1, "node_id": 1 }, "index": 30001 },
        "snapshot": { "leader_id": { "term": 1, "node_id": 1 }, "index": 30001 },
        "purged": { "leader_id": { "term": 1, "node_id": 1 }, "index": 30001 },
        "state": "Leader",
        "current_leader": 1,
        "millis_since_quorum_ack": 0,
        "membership_config": {
          "log_id": { "leader_id": { "term": 0, "node_id": 0 }, "index": 0 },
          "membership": {
            "configs": [[1]],
            "nodes": { "1": { "node_id": 1, "rpc_addr": "127.0.0.1:1228" } }
          }
        },
        "replication": { "1": { "leader_id": { "term": 1, "node_id": 1 }, "index": 30001 } }
      },
      "offset": {
        "state": "Leader",
        "current_leader": 1,
        "last_log_index": 1,
        "...": "..."
      },
      "meta": {
        "state": "Leader",
        "current_leader": 1,
        "last_log_index": 42853,
        "...": "..."
      }
    }
  },
  "error": null
}

data fields:

FieldTypeDescription
versionstringCurrent Broker version
cluster_namestringCluster name
start_timeu64Process start time (Unix seconds)
broker_node_listarrayList of all Broker nodes in the cluster
nodesstring[]Deduplicated list of cluster node IPs
metaobjectRaft group status map, keyed by group name

meta keys:

KeyDescription
mqttMQTT data Raft group status
offsetOffset data Raft group status
metaMetadata Raft group status

Per-group status (MetaStatus) fields:

FieldTypeDescription
running_stateobjectRuntime health; {"Ok": null} means healthy
idu64Current node ID
current_termu64Current Raft term
voteobjectCurrent vote information
last_log_indexu64Latest log index
last_appliedobjectLog position last applied to the state machine
snapshotobject/nullLatest snapshot position
purgedobject/nullOldest log position that has been purged
statestringNode role: Leader, Follower, or Candidate
current_leaderu64Current Leader node ID
millis_since_quorum_acku64Milliseconds since last quorum acknowledgement
membership_configobjectCluster membership configuration
replicationobjectPer-node replication progress (key = node ID)

BrokerConfig Field Reference

Base Configuration

FieldTypeDescription
cluster_namestringCluster name
broker_idu64Broker node ID
broker_ipstring/nullBroker IP address
rolesstring[]Node roles: meta, broker, engine
grpc_portu32gRPC service port
http_portu32HTTP API service port
meta_addrsobjectMeta node address map (node ID → address)

prometheus

FieldTypeDescription
enableboolWhether to enable Prometheus metrics
portu16Prometheus metrics port

log

FieldTypeDescription
log_pathstringLog output directory
log_configstringLog tracing config file path

runtime

FieldTypeDescription
runtime_worker_threadsusizeLegacy global thread multiplier; used as fallback when per-runtime fields are 0
server_worker_threadsusizeServer runtime threads (0 = auto, equals CPU core count)
meta_worker_threadsusizeMeta runtime threads (0 = auto)
broker_worker_threadsusizeBroker runtime threads (0 = auto, hot-path runtime)
channels_per_addressusizeNumber of gRPC connection channels per address
tls_certstringTLS certificate file path
tls_keystringTLS private key file path

network

FieldTypeDescription
accept_thread_numusizeNumber of connection accept threads
handler_thread_numusizeNumber of message handler threads
queue_sizeusizeInternal queue size

pprof

FieldTypeDescription
enableboolWhether to enable pprof profiling
portu16pprof HTTP port
frequencyi32Sampling frequency

meta_runtime

FieldTypeDescription
heartbeat_timeout_msu64Heartbeat timeout (ms)
heartbeat_check_time_msu64Heartbeat check interval (ms)
raft_write_timeout_secu64Raft write timeout (seconds)
offset_raft_group_numu32Number of Offset Raft shard groups (default 1)
data_raft_group_numu32Number of data Raft shard groups (default 1)

rocksdb

FieldTypeDescription
data_pathstringRocksDB data directory
max_open_filesi32Maximum open files

storage_runtime

FieldTypeDescription
tcp_portu32Storage engine TCP port
max_segment_sizeu32Maximum segment file size (bytes)
io_thread_numu32Number of IO threads
data_pathstring[]List of data storage paths
offset_enable_cacheboolWhether to enable offset cache

mqtt_server

FieldTypeDescription
tcp_portu32MQTT TCP port
tls_portu32MQTT TLS port
websocket_portu32MQTT WebSocket port
websockets_portu32MQTT WebSocket Secure port
quic_portu32MQTT QUIC port

mqtt_keep_alive

FieldTypeDescription
enableboolWhether to enable keep-alive detection
default_timeu16Default keep-alive time (seconds)
max_timeu16Maximum keep-alive time (seconds)
default_timeoutu16Timeout multiplier

mqtt_runtime

FieldTypeDescription
default_userstringDefault username
default_passwordstringDefault password
durable_sessions_enableboolWhether to enable durable sessions
secret_free_loginboolWhether to allow password-free login
is_self_protection_statusboolWhether the node is in self-protection mode

mqtt_offline_message

FieldTypeDescription
enableboolWhether to enable offline messages
expire_msu32Expiry time (ms), 0 means no expiry
max_messages_numu32Maximum offline messages per client, 0 means unlimited

mqtt_slow_subscribe

FieldTypeDescription
enableboolWhether to enable slow subscribe detection
record_timeu64Recording threshold (ms)
delay_typestringDelay type: Whole, Internal, Response

mqtt_flapping_detect

FieldTypeDescription
enableboolWhether to enable connection flapping detection
window_timeu32Detection window (minutes)
max_client_connectionsu64Maximum connections in the window
ban_timeu32Ban duration (minutes)

mqtt_protocol

FieldTypeDescription
max_session_expiry_intervalu32Maximum session expiry interval (seconds)
default_session_expiry_intervalu32Default session expiry interval (seconds)
topic_alias_maxu16Maximum Topic Alias value
max_packet_sizeu32Maximum packet size (bytes)
receive_maxu16Receive maximum
max_message_expiry_intervalu64Maximum message expiry interval (seconds)
client_pkid_persistentboolWhether to persist client Packet IDs

mqtt_schema

FieldTypeDescription
enableboolWhether to enable schema validation
strategystringValidation strategy: ALL, Any
failed_operationstringAction on failure: Discard, DisconnectAndDiscard, Ignore
echo_logboolWhether to output validation logs
log_levelstringLog level

mqtt_system_monitor

FieldTypeDescription
enableboolWhether to enable system monitoring
os_cpu_high_watermarkf32CPU high watermark (percentage)
os_memory_high_watermarkf32Memory high watermark (percentage)
system_topic_interval_msu64System topic metrics publish interval (ms)

cluster_limit

FieldTypeDescription
max_network_connectionu64Maximum total network connections in the cluster
max_network_connection_rateu32Maximum new connection rate per second in the cluster
max_admin_http_uri_rateu32Maximum Admin HTTP request rate per second

mqtt_limit

Both cluster and tenant use the LimitQuota structure:

FieldTypeDescription
max_connections_per_nodeu64Maximum connections per node
max_connection_rateu32Maximum new connection rate per second
max_topicsu64Maximum number of topics
max_sessionsu64Maximum number of sessions
max_publish_rateu32Maximum publish rate per second

llm_client

Optional configuration; null when not set.

FieldTypeDescription
platformstringLLM platform, e.g. open_ai, anthropic, ollama
modelstringModel name
tokenstring/nullAPI token (not required for Ollama)
base_urlstring/nullCustom API base URL (must start with http:// or https://)

Usage Examples

Get Cluster Configuration

bash
curl -X GET http://localhost:8080/api/cluster/config/get

Get Cluster Status

bash
curl -X GET http://localhost:8080/api/cluster/status

Set Flapping Detection Configuration

bash
curl -X POST http://localhost:8080/api/cluster/config/set \
  -H "Content-Type: application/json" \
  -d '{
    "config_type": "MqttFlappingDetect",
    "config": "{\"enable\":true,\"window_time\":2,\"max_client_connections\":20,\"ban_time\":10}"
  }'

Tenant Management

A Tenant is the core multi-tenancy concept in RobustMQ, providing logical isolation of cluster resources. Use tenants when a single cluster serves multiple business units, multiple environments (dev / staging / prod), or multiple teams.

4. List Tenants

  • Endpoint: GET /api/tenant/list
  • Description: List all tenants. Supports pagination, sorting, and filtering.
  • Query Parameters:
FieldTypeRequiredDescription
tenant_namestringNoFuzzy search by tenant name (contains match)
pageu32NoPage number, starting from 1 (default: 1)
limitu32NoItems per page (default: 100)
sort_fieldstringNoSort field; supports tenant_name
sort_bystringNoSort direction: asc or desc
  • Response Example:
json
{
  "code": 0,
  "data": {
    "data": [
      {
        "tenant_name": "business-a",
        "desc": "Business A tenant",
        "config": {
          "max_connections_per_node": 10000000,
          "max_create_connection_rate_per_second": 10000,
          "max_topics": 5000000,
          "max_sessions": 50000000,
          "max_publish_rate": 10000
        },
        "create_time": 1738800000
      }
    ],
    "total_count": 1
  },
  "error": null
}
  • curl Example:
bash
# List all tenants
curl -X GET "http://localhost:8080/api/tenant/list"

# Fuzzy search tenants with "business" in the name
curl -X GET "http://localhost:8080/api/tenant/list?tenant_name=business"

5. Create Tenant

  • Endpoint: POST /api/tenant/create
  • Description: Create a new tenant.
  • Request Body:
FieldTypeRequiredValidationDescription
tenant_namestringYesLength 1–128Tenant name, must be globally unique
descstringNoLength ≤ 500Human-readable description
configobjectNo-Tenant resource quota; defaults are used if not set
config.max_connections_per_nodeu64No-Max connections per node (default: 10000000)
config.max_create_connection_rate_per_secondu32No-Max new connection rate per second (default: 10000)
config.max_topicsu64No-Max topics (default: 5000000)
config.max_sessionsu64No-Max sessions (default: 50000000)
config.max_publish_rateu32No-Max publish rate per second (default: 10000)
  • Request Example:
json
{
  "tenant_name": "business-a",
  "desc": "Business A tenant",
  "config": {
    "max_connections_per_node": 50000,
    "max_topics": 100000,
    "max_sessions": 200000,
    "max_publish_rate": 5000
  }
}
  • Response Example:
json
{
  "code": 0,
  "data": "success",
  "error": null
}
  • curl Example:
bash
curl -X POST http://localhost:8080/api/tenant/create \
  -H "Content-Type: application/json" \
  -d '{"tenant_name": "business-a", "desc": "Business A tenant"}'

6. Delete Tenant

  • Endpoint: POST /api/tenant/delete
  • Description: Delete a tenant by name. After deletion, metadata belonging to the tenant is no longer managed by it.
  • Request Body:
FieldTypeRequiredValidationDescription
tenant_namestringYesLength 1–128Name of the tenant to delete
  • Request Example:
json
{
  "tenant_name": "business-a"
}
  • Response Example:
json
{
  "code": 0,
  "data": "success",
  "error": null
}
  • curl Example:
bash
curl -X POST http://localhost:8080/api/tenant/delete \
  -H "Content-Type: application/json" \
  -d '{"tenant_name": "business-a"}'

7. Update Tenant

  • Endpoint: POST /api/tenant/update
  • Description: Update a tenant's description and resource quota. The tenant must already exist. If config is omitted, the existing configuration is preserved.
  • Request Body:
FieldTypeRequiredValidationDescription
tenant_namestringYesLength 1–128Name of the tenant to update
descstringNoLength ≤ 500New description
configobjectNo-Resource quota; existing config is kept if omitted
config.max_connections_per_nodeu64No-Max connections per node
config.max_create_connection_rate_per_secondu32No-Max new connection rate per second
config.max_topicsu64No-Max topics
config.max_sessionsu64No-Max sessions
config.max_publish_rateu32No-Max publish rate per second
  • Request Example:
json
{
  "tenant_name": "business-a",
  "desc": "Business A tenant (updated)",
  "config": {
    "max_connections_per_node": 100000,
    "max_publish_rate": 20000
  }
}
  • Response Example:
json
{
  "code": 0,
  "data": "success",
  "error": null
}
  • Error Response (when tenant does not exist):
json
{
  "code": 1,
  "message": "Tenant business-a not found",
  "data": null
}
  • curl Example:
bash
curl -X POST http://localhost:8080/api/tenant/update \
  -H "Content-Type: application/json" \
  -d '{"tenant_name": "business-a", "desc": "Business A tenant (updated)", "config": {"max_connections_per_node": 100000}}'

Health Check

8. Liveness Check

  • Endpoint: GET /cluster/healthy
  • Description: Check if the service is alive. Returns true when healthy.
  • Request Parameters: None
  • Response Example:
json
{
  "code": 0,
  "message": "success",
  "data": true
}

9. Readiness Check

  • Endpoint: GET /health/ready
  • Description: Check whether all configured ports are ready. Intended for use as a Kubernetes readiness probe.
  • Request Parameters: None
  • Response:
    • 200 OK — All ports ready:
json
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "ok",
    "check_type": "ready",
    "message": "all configured ports are ready"
  }
}
  • 503 Service Unavailable — Ports not ready:
json
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "not_ready",
    "check_type": "ready",
    "message": "one or more configured ports are not ready"
  }
}

10. Node Health Check

  • Endpoint: GET /health/node
  • Description: Node-level health check (placeholder implementation, always returns ok).
  • Request Parameters: None
  • Response Example:
json
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "ok",
    "check_type": "node",
    "message": "health check placeholder"
  }
}

11. Cluster Health Check

  • Endpoint: GET /health/cluster
  • Description: Cluster-level health check (placeholder implementation, always returns ok).
  • Request Parameters: None
  • Response Example:
json
{
  "code": 0,
  "message": "success",
  "data": {
    "status": "ok",
    "check_type": "cluster",
    "message": "health check placeholder"
  }
}

Cluster Resource APIs

Topic, User, ACL, Blacklist, Connector, Schema, and Tenant are all cluster-wide shared resources. Their APIs are unified under the /cluster prefix.

12. Topic Management

12.1 Topic List Query

  • Endpoint: GET /api/cluster/topic/list
  • Description: Query the topic list with optional tenant filter, fuzzy topic name search, and topic type filter.
  • Request Parameters:
FieldTypeRequiredDescription
tenantstringNoFilter by tenant (exact match)
topic_namestringNoFuzzy search by topic name
topic_typestringNoall (default), normal, system (contains $)
limitu32NoPage size
pageu32NoPage number, starting from 1
sort_fieldstringNoSort field: topic_name, tenant
sort_bystringNoasc / desc
  • Response Example:
json
{ "code": 0, "data": { "data": [ { "topic_id": "...", "topic_name": "sensor/temperature", "tenant": "default" } ], "total_count": 25 } }

12.2 Topic Detail Query

  • Endpoint: GET /api/cluster/topic/detail
  • Description: Query detailed information for a specific topic, including retain messages, subscriber list, and storage shards.
  • Request Parameters: tenant (required), topic_name (required)

12.3 Delete Topic

  • Endpoint: POST /api/cluster/topic/delete
  • Description: Delete a topic (soft delete; shard data is cleaned up asynchronously in the background).
  • Request Body: { "tenant": "default", "topic_name": "sensor/temperature" }
  • Response: Returns "success" on success.

12.4 Topic Rewrite Rules List

  • Endpoint: GET /api/cluster/topic-rewrite/list
  • Request Parameters: tenant, name, limit, page, sort_field, sort_by

12.5 Create Topic Rewrite Rule

  • Endpoint: POST /api/cluster/topic-rewrite/create
  • Request Body: { "name", "tenant", "action", "source_topic", "dest_topic", "regex", "desc" }
  • action: All | Publish | Subscribe

12.6 Delete Topic Rewrite Rule

  • Endpoint: POST /api/cluster/topic-rewrite/delete
  • Request Body: { "tenant": "default", "name": "my-rule" }

13. User Management

13.1 User List Query

  • Endpoint: GET /api/cluster/user/list
  • Request Parameters: tenant, username, limit, page, sort_field, sort_by

13.2 Create User

  • Endpoint: POST /api/cluster/user/create
  • Request Body:
json
{
  "username": "alice",
  "password": "secret",
  "is_superuser": false
}

13.3 Delete User

  • Endpoint: POST /api/cluster/user/delete
  • Request Body: { "username": "alice" }

14. ACL Management

14.1 ACL List Query

  • Endpoint: GET /api/cluster/acl/list
  • Description: Query ACL rules with optional fuzzy filtering on tenant, name, and resource_name.
  • Request Parameters: tenant, name, resource_name, limit, page, sort_field, sort_by

14.2 Create ACL Rule

  • Endpoint: POST /api/cluster/acl/create
  • Request Body:
json
{
  "tenant": "default",
  "name": "rule-name",
  "resource_type": "ClientId",
  "resource_name": "client001",
  "action": "Publish",
  "permission": "Allow",
  "topic": "sensor/+",
  "ip": "",
  "desc": ""
}
  • resource_type: ClientId | User | Ip
  • action: Publish | Subscribe | All
  • permission: Allow | Deny

14.3 Delete ACL Rule

  • Endpoint: POST /api/cluster/acl/delete
  • Request Body: { "tenant": "default", "name": "rule-name" }

15. Blacklist Management

15.1 Blacklist List Query

  • Endpoint: GET /api/cluster/blacklist/list
  • Request Parameters: tenant, name, resource_name, limit, page, sort_field, sort_by

15.2 Create Blacklist Entry

  • Endpoint: POST /api/cluster/blacklist/create
  • Request Body:
json
{
  "name": "bl-bad-client",
  "tenant": "default",
  "blacklist_type": "ClientId",
  "resource_name": "bad_client",
  "end_time": 1735689599,
  "desc": ""
}
  • blacklist_type: ClientId | User | Ip | ClientIdMatch | UserMatch | IPCIDR

15.3 Delete Blacklist Entry

  • Endpoint: POST /api/cluster/blacklist/delete
  • Request Body: { "tenant": "default", "name": "bl-bad-client" }

16. Connector Management

16.1 Connector List Query

  • Endpoint: GET /api/cluster/connector/list
  • Request Parameters: tenant, connector_name, limit, page, sort_field, sort_by

16.2 Connector Detail

  • Endpoint: GET /api/cluster/connector/detail
  • Request Parameters: tenant, connector_name

16.3 Create Connector

  • Endpoint: POST /api/cluster/connector/create
  • Request Body:
json
{
  "connector_name": "kafka-sink",
  "connector_type": "kafka",
  "tenant": "default",
  "topic_name": "sensor/temperature",
  "config": "{ ... }",
  "failure_strategy": { "strategy": "discard" }
}
  • connector_type: kafka | pulsar | rabbitmq | postgres | mysql | mongodb | elasticsearch | redis | webhook | opentsdb | mqtt | clickhouse | influxdb | cassandra | s3 | file | greptime
  • failure_strategy.strategy: discard | discard_after_retry | dead_message_queue

16.4 Delete Connector

  • Endpoint: POST /api/cluster/connector/delete
  • Request Body: { "tenant": "default", "connector_name": "kafka-sink" }

17. Schema Management

17.1 Schema List Query

  • Endpoint: GET /api/cluster/schema/list
  • Request Parameters: tenant, name, limit, page, sort_field, sort_by

17.2 Create Schema

  • Endpoint: POST /api/cluster/schema/create
  • Request Body:
json
{
  "tenant": "default",
  "schema_name": "sensor-schema",
  "schema_type": "json",
  "schema": "{ \"type\": \"object\" }",
  "desc": ""
}
  • schema_type: json | avro | protobuf

17.3 Delete Schema

  • Endpoint: POST /api/cluster/schema/delete
  • Request Body: { "tenant": "default", "schema_name": "sensor-schema" }

17.4 Schema Binding List

  • Endpoint: GET /api/cluster/schema-bind/list
  • Request Parameters: tenant, resource_name, schema_name, limit, page

17.5 Create Schema Binding

  • Endpoint: POST /api/cluster/schema-bind/create
  • Request Body: { "tenant": "default", "schema_name": "sensor-schema", "resource_name": "sensor/+" }

17.6 Delete Schema Binding

  • Endpoint: POST /api/cluster/schema-bind/delete
  • Request Body: { "tenant": "default", "schema_name": "sensor-schema", "resource_name": "sensor/+" }

18. Tenant Management (Cluster-Wide)

These endpoints replace the former /mqtt/tenant/* endpoints. Full CRUD including update is supported.

18.1 List Tenants

  • Endpoint: GET /api/cluster/tenant/list
  • Request Parameters: tenant_name, limit, page, sort_field, sort_by

18.2 Create Tenant

  • Endpoint: POST /api/cluster/tenant/create
  • Request Body:
json
{
  "tenant_name": "my-tenant",
  "desc": "",
  "config": {
    "max_connections_per_node": 10000,
    "max_create_connection_rate_per_second": 100,
    "max_topics": 10000,
    "max_sessions": 10000,
    "max_publish_rate": 10000
  }
}

18.3 Update Tenant

  • Endpoint: POST /api/cluster/tenant/update
  • Request Body: { "tenant_name": "my-tenant", "desc": "updated", "config": { ... } }

18.4 Delete Tenant

  • Endpoint: POST /api/cluster/tenant/delete
  • Request Body: { "tenant_name": "my-tenant" }

Notes

  1. Response Format: On success, code is 0 and error is null; on failure, code is 100 and error contains the error message.
  2. Configuration Format: The config field in config/set must be a valid JSON string.
  3. Hot Update: Most configurations support hot updates without restarting the service.
  4. Backup Recommendation: It is recommended to fetch the current configuration via config/get before making changes.
  5. Tenant Isolation: Tenants provide logical isolation, suitable for private deployments. For SaaS multi-tenancy requiring stronger isolation boundaries, consider physical separation.
🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀