PostgreSQL 连接器 
概述 
PostgreSQL 连接器是 RobustMQ 提供的数据集成组件,用于将 MQTT 消息桥接到 PostgreSQL 关系型数据库系统。该连接器支持高性能的数据写入,适合 IoT 数据存储、历史数据分析、数据持久化和业务数据集成等场景。
配置说明 
连接器配置 
PostgreSQL 连接器使用 PostgresConnectorConfig 结构进行配置:
rust
pub struct PostgresConnectorConfig {
    pub host: String,                    // PostgreSQL 服务器地址
    pub port: u16,                       // PostgreSQL 服务器端口
    pub database: String,                // 数据库名称
    pub username: String,                // 用户名
    pub password: String,                // 密码
    pub table: String,                   // 目标表名
    pub pool_size: Option<u32>,          // 连接池大小
    pub enable_batch_insert: Option<bool>, // 启用批量插入
    pub enable_upsert: Option<bool>,     // 启用 UPSERT 操作
    pub conflict_columns: Option<String>, // 冲突列定义
}配置参数 
| 参数名 | 类型 | 必填 | 说明 | 示例 | 
|---|---|---|---|---|
| host | String | 是 | PostgreSQL 服务器地址 | localhost或192.168.1.100 | 
| port | u16 | 是 | PostgreSQL 服务器端口 | 5432 | 
| database | String | 是 | 数据库名称 | mqtt_data | 
| username | String | 是 | 数据库用户名 | postgres | 
| password | String | 是 | 数据库密码 | password123 | 
| table | String | 是 | 目标表名 | mqtt_messages | 
| pool_size | u32 | 否 | 连接池大小,默认为 10 | 20 | 
| enable_batch_insert | bool | 否 | 启用批量插入,默认为 false | true | 
| enable_upsert | bool | 否 | 启用 UPSERT 操作,默认为 false | true | 
| conflict_columns | String | 否 | 冲突列定义,默认为 "client_id, topic" | "client_id, topic" | 
配置示例 
JSON 配置格式 
json
{
  "host": "localhost",
  "port": 5432,
  "database": "mqtt_data",
  "username": "postgres",
  "password": "password123",
  "table": "mqtt_messages",
  "pool_size": 20,
  "enable_batch_insert": true,
  "enable_upsert": false,
  "conflict_columns": "client_id, topic"
}完整连接器配置 
json
{
  "cluster_name": "default",
  "connector_name": "postgres_connector_01",
  "connector_type": "Postgres",
  "config": "{\"host\": \"localhost\", \"port\": 5432, \"database\": \"mqtt_data\", \"username\": \"postgres\", \"password\": \"password123\", \"table\": \"mqtt_messages\", \"enable_batch_insert\": true}",
  "topic_name": "sensor/data",
  "status": "Idle",
  "broker_id": null,
  "create_time": 1640995200,
  "update_time": 1640995200
}数据库表结构 
表结构定义 
PostgreSQL 连接器要求目标表具有以下结构:
sql
CREATE TABLE mqtt_messages (
    client_id VARCHAR(255) NOT NULL,     -- MQTT 客户端 ID
    topic VARCHAR(500) NOT NULL,         -- MQTT 主题
    timestamp BIGINT NOT NULL,           -- 消息时间戳(秒)
    payload TEXT,                        -- 消息载荷(字符串格式)
    data BYTEA,                          -- 消息原始数据(二进制格式)
    PRIMARY KEY (client_id, topic, timestamp)
);字段说明 
| 字段名 | 数据类型 | 说明 | 示例 | 
|---|---|---|---|
| client_id | VARCHAR(255) | MQTT 客户端 ID | sensor_001 | 
| topic | VARCHAR(500) | MQTT 主题名称 | sensor/temperature | 
| timestamp | BIGINT | 消息时间戳(秒级) | 1640995200 | 
| payload | TEXT | 消息载荷(UTF-8 字符串) | {"temperature": 25.5} | 
| data | BYTEA | 消息原始数据(二进制) | \x7b2274656d70657261747572... | 
索引建议 
为了提高查询性能,建议创建以下索引:
sql
-- 时间范围查询索引
CREATE INDEX idx_mqtt_messages_timestamp ON mqtt_messages (timestamp);
-- 主题查询索引
CREATE INDEX idx_mqtt_messages_topic ON mqtt_messages (topic);
-- 客户端查询索引
CREATE INDEX idx_mqtt_messages_client_id ON mqtt_messages (client_id);
-- 复合查询索引
CREATE INDEX idx_mqtt_messages_topic_timestamp ON mqtt_messages (topic, timestamp);高级特性 
批量插入 
启用 enable_batch_insert 选项可以显著提高写入性能:
json
{
  "enable_batch_insert": true
}优势:
- 减少网络往返次数
- 提高数据库写入吞吐量
- 降低系统资源消耗
注意事项:
- 批量大小由系统自动控制(默认 100 条记录)
- 适合高频率消息场景
UPSERT 操作 
启用 enable_upsert 选项可以处理重复数据:
json
{
  "enable_upsert": true,
  "conflict_columns": "client_id, topic"
}工作原理:
- 当遇到冲突时,更新现有记录的 timestamp、payload和data字段
- 冲突检测基于 conflict_columns指定的列组合
SQL 示例:
sql
INSERT INTO mqtt_messages (client_id, topic, timestamp, payload, data) 
VALUES ($1, $2, $3, $4, $5) 
ON CONFLICT (client_id, topic) 
DO UPDATE SET 
    timestamp = EXCLUDED.timestamp, 
    payload = EXCLUDED.payload, 
    data = EXCLUDED.data连接池管理 
连接器使用连接池来管理数据库连接:
json
{
  "pool_size": 20
}配置建议:
- 低并发场景:5-10 个连接
- 中等并发场景:10-20 个连接
- 高并发场景:20-50 个连接
使用 robust-ctl 创建 PostgreSQL 连接器 
基本语法 
使用 robust-ctl 命令行工具可以方便地创建和管理 PostgreSQL 连接器:
bash
robust-ctl mqtt connector create \
  --connector-name <连接器名称> \
  --connector-type <连接器类型> \
  --config <配置> \
  --topic-id <主题ID>创建 PostgreSQL 连接器 
1. 基本创建命令 
bash
# 创建 PostgreSQL 连接器
robust-ctl mqtt connector create \
  --connector-name "postgres_connector_01" \
  --connector-type "Postgres" \
  --config '{"host": "localhost", "port": 5432, "database": "mqtt_data", "username": "postgres", "password": "password123", "table": "mqtt_messages"}' \
  --topic-id "sensor/data"2. 参数说明 
| 参数 | 说明 | 示例值 | 
|---|---|---|
| --connector-name | 连接器名称,必须唯一 | postgres_connector_01 | 
| --connector-type | 连接器类型,固定为 Postgres | Postgres | 
| --config | JSON 格式的配置信息 | {"host": "localhost", "port": 5432, ...} | 
| --topic-id | 要监听的 MQTT 主题 | sensor/data | 
3. 高性能配置示例 
bash
# 创建高性能 PostgreSQL 连接器
robust-ctl mqtt connector create \
  --connector-name "high_perf_postgres" \
  --connector-type "Postgres" \
  --config '{"host": "postgres.example.com", "port": 5432, "database": "iot_data", "username": "iot_user", "password": "secure_password", "table": "sensor_data", "pool_size": 30, "enable_batch_insert": true, "enable_upsert": true, "conflict_columns": "client_id, topic"}' \
  --topic-id "iot/sensors/+/data"管理连接器 
1. 列出所有连接器 
bash
# 列出所有连接器
robust-ctl mqtt connector list
# 列出指定名称的连接器
robust-ctl mqtt connector list --connector-name "postgres_connector_01"2. 删除连接器 
bash
# 删除指定连接器
robust-ctl mqtt connector delete --connector-name "postgres_connector_01"完整操作示例 
场景:创建 IoT 数据存储系统 
bash
# 1. 创建传感器数据 PostgreSQL 连接器
robust-ctl mqtt connector create \
  --connector-name "iot_sensor_postgres" \
  --connector-type "Postgres" \
  --config '{"host": "postgres.iot.local", "port": 5432, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "sensor_readings", "pool_size": 25, "enable_batch_insert": true}' \
  --topic-id "iot/sensors/+/readings"
# 2. 创建设备状态 PostgreSQL 连接器
robust-ctl mqtt connector create \
  --connector-name "device_status_postgres" \
  --connector-type "Postgres" \
  --config '{"host": "postgres.iot.local", "port": 5432, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "device_status", "enable_upsert": true, "conflict_columns": "client_id"}' \
  --topic-id "iot/devices/+/status"
# 3. 创建告警消息 PostgreSQL 连接器
robust-ctl mqtt connector create \
  --connector-name "alarm_postgres" \
  --connector-type "Postgres" \
  --config '{"host": "postgres.iot.local", "port": 5432, "database": "iot_platform", "username": "iot_writer", "password": "iot_pass_2023", "table": "alarm_logs", "pool_size": 15}' \
  --topic-id "iot/alarms/#"
# 4. 查看创建的连接器
robust-ctl mqtt connector list
# 5. 测试连接器(发布测试消息)
robust-ctl mqtt publish \
  --username "test_user" \
  --password "test_pass" \
  --topic "iot/sensors/temp_001/readings" \
  --qos 1 \
  --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}'数据查询示例 
基本查询 
sql
-- 查询最近 1 小时的传感器数据
SELECT client_id, topic, timestamp, payload 
FROM mqtt_messages 
WHERE timestamp > EXTRACT(EPOCH FROM NOW() - INTERVAL '1 hour')
ORDER BY timestamp DESC;
-- 查询特定客户端的消息
SELECT topic, timestamp, payload 
FROM mqtt_messages 
WHERE client_id = 'sensor_001'
ORDER BY timestamp DESC
LIMIT 100;
-- 查询特定主题的消息
SELECT client_id, timestamp, payload 
FROM mqtt_messages 
WHERE topic LIKE 'sensor/temperature%'
ORDER BY timestamp DESC;聚合查询 
sql
-- 统计每小时的消息数量
SELECT 
    DATE_TRUNC('hour', TO_TIMESTAMP(timestamp)) as hour,
    COUNT(*) as message_count
FROM mqtt_messages 
WHERE timestamp > EXTRACT(EPOCH FROM NOW() - INTERVAL '24 hours')
GROUP BY hour
ORDER BY hour;
-- 统计各主题的消息数量
SELECT 
    topic,
    COUNT(*) as message_count,
    MIN(TO_TIMESTAMP(timestamp)) as first_message,
    MAX(TO_TIMESTAMP(timestamp)) as last_message
FROM mqtt_messages 
GROUP BY topic
ORDER BY message_count DESC;性能优化 
数据库优化 
- 表分区
sql
-- 按时间分区
CREATE TABLE mqtt_messages_2024_01 PARTITION OF mqtt_messages
FOR VALUES FROM (1704067200) TO (1706745600);- 定期清理
sql
-- 删除 30 天前的数据
DELETE FROM mqtt_messages 
WHERE timestamp < EXTRACT(EPOCH FROM NOW() - INTERVAL '30 days');- 连接池调优
sql
-- PostgreSQL 配置优化
max_connections = 200
shared_buffers = 256MB
effective_cache_size = 1GB
work_mem = 4MB连接器优化 
- 启用批量插入
json
{
  "enable_batch_insert": true,
  "pool_size": 30
}- 合理设置连接池大小
- 根据并发消息量调整 pool_size
- 监控数据库连接使用情况
- 使用 UPSERT 处理重复数据
json
{
  "enable_upsert": true,
  "conflict_columns": "client_id, topic"
}监控和故障排除 
日志监控 
连接器会输出详细的运行日志:
INFO  Connector postgres_connector_01 successfully connected to PostgreSQL database: mqtt_data
INFO  Connector postgres_connector_01 successfully wrote 100 records to PostgreSQL table mqtt_messages
ERROR Connector postgres_connector_01 failed to write data to PostgreSQL table mqtt_messages, error: connection timeout常见问题 
- 连接失败 - 检查网络连通性
- 验证数据库凭据
- 确认防火墙设置
 
- 写入性能低 - 启用批量插入
- 增加连接池大小
- 优化数据库索引
 
- 数据重复 - 启用 UPSERT 功能
- 配置合适的冲突列
 
总结 
PostgreSQL 连接器是 RobustMQ 数据集成系统的重要组件,提供了高性能的关系型数据库桥接能力。通过合理的配置和使用,可以满足 IoT 数据存储、历史数据分析、数据持久化和业务数据集成等多种业务需求。
该连接器充分利用了 PostgreSQL 的 ACID 特性和丰富的查询能力,结合 Rust 语言的内存安全和零成本抽象优势,实现了高效、可靠的数据存储,是构建现代化 IoT 数据平台和分析系统的重要工具。
