Skip to content

Redis 连接器

概述

Redis 连接器是 RobustMQ 提供的数据集成组件,用于将 MQTT 消息桥接到 Redis 数据库。该连接器支持 Single、Cluster、Sentinel 三种部署模式,通过命令模板灵活定义数据写入方式,适合实时缓存、消息队列、会话存储和计数器等场景。

配置说明

连接器配置

Redis 连接器使用 RedisConnectorConfig 结构进行配置:

rust
pub struct RedisConnectorConfig {
    pub server: String,                       // Redis 服务器地址
    pub mode: RedisMode,                      // 部署模式
    pub database: u8,                         // 数据库编号
    pub username: Option<String>,             // 用户名
    pub password: Option<String>,             // 密码
    pub sentinel_master_name: Option<String>, // Sentinel 主节点名称
    pub command_template: String,             // 命令模板
    pub tls_enabled: bool,                    // 是否启用 TLS
    pub connect_timeout_ms: u64,              // 连接超时时间(毫秒)
    pub pool_size: u32,                       // 连接池大小
    pub max_retries: u32,                     // 最大重试次数
    pub retry_interval_ms: u64,              // 重试间隔(毫秒)
}

pub enum RedisMode {
    Single,    // 单机模式
    Cluster,   // 集群模式
    Sentinel,  // 哨兵模式
}

配置参数

参数名类型必填默认值说明示例
serverString-Redis 服务器地址,长度不超过 1024 字符127.0.0.1:6379
modeStringsingle部署模式:singleclustersentinelsingle
databaseNumber0数据库编号,范围:0-150
usernameString-用户名(Redis 6.0+ ACL)default
passwordString-密码password123
sentinel_master_nameString-Sentinel 主节点名称,Sentinel 模式时必填mymaster
command_templateString-命令模板,支持 ${key}${payload} 等占位符,长度不超过 4096 字符SET ${key} ${payload}
tls_enabledBooleanfalse是否启用 TLS 加密连接true
connect_timeout_msNumber5000连接超时时间(毫秒)10000
pool_sizeNumber10连接池大小,范围:1-10020
max_retriesNumber3最大重试次数5
retry_interval_msNumber1000重试间隔(毫秒)2000

命令模板

命令模板支持以下占位符:

占位符说明
${key}消息键值
${payload}消息内容
${timestamp}消息时间戳

命令模板示例

SET ${key} ${payload}
LPUSH mqtt:messages ${payload}
HSET mqtt:data ${key} ${payload}
PUBLISH mqtt:channel ${payload}
SETEX ${key} 3600 ${payload}

配置示例

JSON 配置格式

单机模式

json
{
  "server": "127.0.0.1:6379",
  "command_template": "LPUSH mqtt:messages ${payload}"
}

带密码认证

json
{
  "server": "127.0.0.1:6379",
  "password": "your_password",
  "database": 1,
  "command_template": "SET mqtt:${key} ${payload}",
  "pool_size": 20
}

集群模式

json
{
  "server": "redis-1:6379,redis-2:6379,redis-3:6379",
  "mode": "cluster",
  "password": "cluster_password",
  "command_template": "SET mqtt:${key} ${payload}",
  "pool_size": 30,
  "connect_timeout_ms": 10000
}

哨兵模式

json
{
  "server": "sentinel-1:26379,sentinel-2:26379,sentinel-3:26379",
  "mode": "sentinel",
  "sentinel_master_name": "mymaster",
  "password": "sentinel_password",
  "command_template": "LPUSH mqtt:events ${payload}",
  "pool_size": 15
}

使用 robust-ctl 创建 Redis 连接器

基本语法

bash
robust-ctl mqtt connector create \
  --connector-name <连接器名> \
  --connector-type <连接器类> \
  --config <> \
  --topic-id <主题ID>

创建示例

1. 基本创建命令

bash
robust-ctl mqtt connector create \
  --connector-name "redis_connector_01" \
  --connector-type "redis" \
  --config '{"server": "127.0.0.1:6379", "command_template": "LPUSH mqtt:messages ${payload}"}' \
  --topic-id "sensor/data"

2. 带认证的 Redis

bash
robust-ctl mqtt connector create \
  --connector-name "redis_auth" \
  --connector-type "redis" \
  --config '{"server": "127.0.0.1:6379", "password": "your_password", "database": 1, "command_template": "SET mqtt:${key} ${payload}", "pool_size": 20}' \
  --topic-id "device/status"

管理连接器

bash
# 列出所有连接器
robust-ctl mqtt connector list

# 查看指定连接器
robust-ctl mqtt connector list --connector-name "redis_connector_01"

# 删除连接器
robust-ctl mqtt connector delete --connector-name "redis_connector_01"

完整操作示例

场景:IoT 实时数据缓存

bash
# 1. 创建传感器数据缓存连接器
robust-ctl mqtt connector create \
  --connector-name "sensor_cache" \
  --connector-type "redis" \
  --config '{"server": "127.0.0.1:6379", "command_template": "SETEX sensor:${key} 3600 ${payload}", "pool_size": 20}' \
  --topic-id "iot/sensors/temperature"

# 2. 创建设备事件队列连接器
robust-ctl mqtt connector create \
  --connector-name "event_queue" \
  --connector-type "redis" \
  --config '{"server": "127.0.0.1:6379", "command_template": "LPUSH iot:events ${payload}", "database": 2}' \
  --topic-id "iot/events"

# 3. 查看创建的连接器
robust-ctl mqtt connector list

性能优化建议

1. 连接池

  • 根据并发量合理设置 pool_size
  • 高吞吐量场景建议增大连接池

2. 命令选择

  • 使用批量命令(如 LPUSH)减少网络往返
  • 对于需要过期的数据使用 SETEXEXPIRE
  • 避免使用阻塞命令

3. 部署模式

  • 单机模式适合开发和测试
  • 生产环境建议使用 Cluster 或 Sentinel 模式
  • Sentinel 模式提供自动故障转移

4. 安全建议

  • 生产环境建议启用 TLS
  • 使用 ACL(Redis 6.0+)进行细粒度权限控制
  • 定期轮换密码

监控和故障排查

1. 查看连接器状态

bash
robust-ctl mqtt connector list --connector-name "redis_connector_01"

2. 常见问题

问题 1:连接失败

  • 检查 Redis 服务是否正常运行
  • 验证服务器地址和端口是否正确
  • 检查网络连接和防火墙设置

问题 2:认证失败

  • 验证密码是否正确
  • 检查用户名和 ACL 配置(Redis 6.0+)

问题 3:写入超时

  • 增加 connect_timeout_ms 配置
  • 检查 Redis 服务负载
  • 考虑增大连接池大小

问题 4:Sentinel 模式连接问题

  • 确认 sentinel_master_name 配置正确
  • 检查 Sentinel 节点是否正常运行
  • 验证所有 Sentinel 节点的连通性

总结

Redis 连接器是 RobustMQ 数据集成系统的重要组件,提供了与 Redis 数据库的无缝集成能力。通过合理的配置和使用,可以实现:

  • 实时缓存:将 MQTT 消息实时写入 Redis,支持高速读写
  • 灵活命令:通过命令模板自定义数据写入方式
  • 高可用:支持 Single、Cluster、Sentinel 三种部署模式
  • 高性能:连接池和批量写入确保高吞吐量场景下的性能