Skip to content

本地文件连接器

概述

本地文件连接器是 RobustMQ 提供的数据集成组件,用于将 MQTT 消息写入本地文件系统。该连接器简单可靠,适合数据备份、日志记录和离线分析等场景。

配置说明

连接器配置

本地文件连接器使用 LocalFileConnectorConfig 结构进行配置:

rust
pub struct LocalFileConnectorConfig {
    pub local_file_path: String,       // 本地文件路径
    pub rotation_strategy: RotationStrategy,  // 文件滚动策略
    pub max_size_gb: u64,              // 最大文件大小(GB)
}

pub enum RotationStrategy {
    None,    // 不滚动
    Size,    // 按大小滚动
    Hourly,  // 按小时滚动
    Daily,   // 按天滚动
}

配置参数

参数名类型必填默认值说明示例
local_file_pathString-本地文件的完整路径/var/log/mqtt_messages.log
rotation_strategyStringnone文件滚动策略:none(不滚动)、size(按大小滚动)、hourly(按小时滚动)、daily(按天滚动)daily
max_size_gbNumber1文件最大大小(GB),仅在 rotation_strategysize 时生效,范围:1-105

配置示例

JSON 配置格式

基本配置(无滚动)

json
{
  "local_file_path": "/var/log/mqtt_messages.log"
}

按大小滚动

json
{
  "local_file_path": "/var/log/mqtt_messages.log",
  "rotation_strategy": "size",
  "max_size_gb": 5
}

按小时滚动

json
{
  "local_file_path": "/var/log/mqtt_messages.log",
  "rotation_strategy": "hourly"
}

按天滚动

json
{
  "local_file_path": "/var/log/mqtt_messages.log",
  "rotation_strategy": "daily"
}

完整连接器配置

无滚动配置

json
{
  "cluster_name": "my_cluster",
  "connector_name": "file_connector",
  "connector_type": "LocalFile",
  "config": "{\"local_file_path\": \"/var/log/mqtt_messages.log\"}",
  "topic_name": "sensor/data",
  "status": "Idle",
  "broker_id": 1,
  "create_time": 1640995200,
  "update_time": 1640995200
}

带滚动策略配置

json
{
  "cluster_name": "my_cluster",
  "connector_name": "file_connector",
  "connector_type": "LocalFile",
  "config": "{\"local_file_path\": \"/var/log/mqtt_messages.log\", \"rotation_strategy\": \"daily\"}",
  "topic_name": "sensor/data",
  "status": "Idle",
  "broker_id": 1,
  "create_time": 1640995200,
  "update_time": 1640995200
}

消息格式

存储格式

本地文件连接器将 MQTT 消息转换为 JSON 格式存储,每个消息占一行。

消息结构

json
{
  "offset": 12345,
  "header": [
    {
      "name": "topic",
      "value": "sensor/temperature"
    },
    {
      "name": "qos",
      "value": "1"
    }
  ],
  "key": "sensor_001",
  "data": "eyJ0ZW1wZXJhdHVyZSI6IDI1LjUsICJodW1pZGl0eSI6IDYwfQ==",
  "tags": ["sensor", "temperature"],
  "timestamp": 1640995200,
  "crc_num": 1234567890
}

字段说明

字段名类型说明
offsetNumber消息偏移量
headerArray消息头信息数组
keyString消息键值
dataString消息内容(Base64 编码)
tagsArray消息标签数组
timestampNumber消息时间戳(秒)
crc_numNumber消息 CRC 校验值

文件滚动策略

功能说明

本地文件连接器支持自动文件滚动(rotation)功能,可以根据文件大小或时间自动创建新文件,避免单个文件过大。

滚动策略类型

1. None(不滚动)

  • 默认策略,所有数据写入同一个文件
  • 适合消息量较小的场景

2. Size(按大小滚动)

  • 当文件达到指定大小时自动滚动
  • 大小范围:1GB - 10GB
  • 滚动后的文件名格式:原文件名_YYYYMMdd_HHMMSS.扩展名
  • 示例:mqtt_messages_20231215_143025.log

3. Hourly(按小时滚动)

  • 每小时自动创建新文件
  • 滚动后的文件名格式:原文件名_YYYYMMdd_HH.扩展名
  • 示例:mqtt_messages_20231215_14.log

4. Daily(按天滚动)

  • 每天自动创建新文件
  • 滚动后的文件名格式:原文件名_YYYYMMdd.扩展名
  • 示例:mqtt_messages_20231215.log

使用示例

按大小滚动(5GB)

bash
robust-ctl mqtt connector create \
  --connector-name "file_size_rotation" \
  --connector-type "LocalFile" \
  --config '{"local_file_path": "/var/log/robustmq/mqtt.log", "rotation_strategy": "size", "max_size_gb": 5}' \
  --topic-id "sensor/data"

按小时滚动

bash
robust-ctl mqtt connector create \
  --connector-name "file_hourly_rotation" \
  --connector-type "LocalFile" \
  --config '{"local_file_path": "/var/log/robustmq/mqtt.log", "rotation_strategy": "hourly"}' \
  --topic-id "sensor/data"

按天滚动

bash
robust-ctl mqtt connector create \
  --connector-name "file_daily_rotation" \
  --connector-type "LocalFile" \
  --config '{"local_file_path": "/var/log/robustmq/mqtt.log", "rotation_strategy": "daily"}' \
  --topic-id "sensor/data"

滚动行为

  1. 文件检查:在写入数据前检查是否需要滚动
  2. 文件重命名:当前文件被重命名为带时间戳的文件
  3. 创建新文件:使用原始文件名创建新文件
  4. 继续写入:新数据写入新文件

注意事项

  • 滚动操作是原子性的,不会丢失数据
  • 旧文件需要手动清理或通过日志轮转工具管理
  • 按大小滚动时,实际文件大小可能略大于设置值(因为是批量写入后检查)
  • 按时间滚动时,精确度为秒级

使用 robust-ctl 创建文件连接器

基本语法

使用 robust-ctl 命令行工具可以方便地创建和管理本地文件连接器:

bash
robust-ctl mqtt connector create \
  --connector-name <连接器名> \
  --connector-type <连接器类> \
  --config <> \
  --topic-id <主题ID>

创建本地文件连接器

1. 基本创建命令

bash
# 创建本地文件连接器
robust-ctl mqtt connector create \
  --connector-name "file_connector_01" \
  --connector-type "LocalFile" \
  --config '{"local_file_path": "/var/log/robustmq/mqtt_messages.log"}' \
  --topic-id "sensor/data"

2. 参数说明

参数说明示例值
--connector-name连接器名称,必须唯一file_connector_01
--connector-type连接器类型,固定为 LocalFileLocalFile
--configJSON 格式的配置信息{"local_file_path": "/path/to/file.log"}
--topic-id要监听的 MQTT 主题sensor/data

3. 配置示例

bash
# 创建传感器数据记录连接器
robust-ctl mqtt connector create \
  --connector-name "sensor_logger" \
  --connector-type "LocalFile" \
  --config '{"local_file_path": "/var/log/robustmq/sensor_data.log"}' \
  --topic-id "sensors/+/data"

管理连接器

1. 列出所有连接器

bash
# 列出所有连接器
robust-ctl mqtt connector list

# 列出指定名称的连接器
robust-ctl mqtt connector list --connector-name "file_connector_01"

2. 删除连接器

bash
# 删除指定连接器
robust-ctl mqtt connector delete --connector-name "file_connector_01"

完整操作示例

场景:创建 IoT 传感器数据记录系统

bash
# 1. 创建传感器数据连接器
robust-ctl mqtt connector create \
  --connector-name "iot_sensor_logger" \
  --connector-type "LocalFile" \
  --config '{"local_file_path": "/var/log/robustmq/iot_sensors.log"}' \
  --topic-id "iot/sensors/+/data"

# 2. 创建设备状态连接器
robust-ctl mqtt connector create \
  --connector-name "device_status_logger" \
  --connector-type "LocalFile" \
  --config '{"local_file_path": "/var/log/robustmq/device_status.log"}' \
  --topic-id "iot/devices/+/status"

# 3. 创建告警消息连接器
robust-ctl mqtt connector create \
  --connector-name "alarm_logger" \
  --connector-type "LocalFile" \
  --config '{"local_file_path": "/var/log/robustmq/alarms.log"}' \
  --topic-id "iot/alarms/#"

# 4. 查看创建的连接器
robust-ctl mqtt connector list

# 5. 测试连接器(发布测试消息)
robust-ctl mqtt publish \
  --username "test_user" \
  --password "test_pass" \
  --topic "iot/sensors/temp_001/data" \
  --qos 1 \
  --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}'

总结

本地文件连接器是 RobustMQ 数据集成系统的重要组成部分,提供了简单可靠的消息持久化能力。通过合理的配置和使用,可以满足数据备份、日志记录和离线分析等多种业务需求。

该连接器充分发挥了 Rust 语言的优势,在保证高性能的同时,提供了内存安全和并发安全的特性,是构建可靠 IoT 数据管道的重要工具。