Skip to content

Elasticsearch 连接器

概述

Elasticsearch 连接器是 RobustMQ 提供的数据集成组件,用于将 MQTT 消息桥接到 Elasticsearch 搜索和分析引擎。该连接器支持将实时 MQTT 数据流写入 Elasticsearch 索引,适合日志分析、全文搜索、数据可视化和实时监控等场景。

配置说明

连接器配置

Elasticsearch 连接器使用 ElasticsearchConnectorConfig 结构进行配置:

rust
pub struct ElasticsearchConnectorConfig {
    pub url: String,                  // Elasticsearch 服务器地址
    pub index: String,                // 索引名称
    pub auth_type: ElasticsearchAuthType,  // 认证类型
    pub username: Option<String>,     // 用户名(Basic 认证)
    pub password: Option<String>,     // 密码(Basic 认证)
    pub api_key: Option<String>,      // API 密钥(ApiKey 认证)
    pub enable_tls: bool,             // 是否启用 TLS
    pub ca_cert_path: Option<String>, // CA 证书路径
    pub timeout_secs: u64,            // 请求超时时间(秒)
    pub max_retries: u32,             // 最大重试次数
}

pub enum ElasticsearchAuthType {
    None,    // 无认证
    Basic,   // 基本认证
    ApiKey,  // API 密钥认证
}

配置参数

参数名类型必填默认值说明示例
urlString-Elasticsearch 服务器地址,长度不超过 512 字符http://localhost:9200
indexString-Elasticsearch 索引名称,长度不超过 256 字符mqtt_messages
auth_typeStringnone认证类型:nonebasicapikeybasic
usernameString-用户名,Basic 认证时必填,长度不超过 256 字符elastic
passwordString-密码,Basic 认证时必填,长度不超过 256 字符password123
api_keyString-API 密钥,ApiKey 认证时必填api_key_value
enable_tlsBooleanfalse是否启用 TLS 加密连接true
ca_cert_pathString-CA 证书文件路径(TLS 连接时使用)/etc/certs/ca.crt
timeout_secsNumber30请求超时时间(秒),范围:1-30060
max_retriesNumber3最大重试次数,不超过 105

配置示例

JSON 配置格式

无认证配置

json
{
  "url": "http://localhost:9200",
  "index": "mqtt_messages"
}

Basic 认证配置

json
{
  "url": "http://localhost:9200",
  "index": "mqtt_messages",
  "auth_type": "basic",
  "username": "elastic",
  "password": "password123"
}

API Key 认证配置

json
{
  "url": "https://elasticsearch.example.com:9200",
  "index": "mqtt_messages",
  "auth_type": "apikey",
  "api_key": "your_api_key_here",
  "enable_tls": true,
  "timeout_secs": 60,
  "max_retries": 5
}

TLS 加密连接配置

json
{
  "url": "https://elasticsearch.example.com:9200",
  "index": "mqtt_messages",
  "auth_type": "basic",
  "username": "elastic",
  "password": "password123",
  "enable_tls": true,
  "ca_cert_path": "/etc/certs/ca.crt"
}

完整连接器配置

json
{
  "cluster_name": "default",
  "connector_name": "elasticsearch_connector_01",
  "connector_type": "Elasticsearch",
  "config": "{\"url\": \"http://localhost:9200\", \"index\": \"mqtt_messages\", \"auth_type\": \"basic\", \"username\": \"elastic\", \"password\": \"password123\"}",
  "topic_name": "sensor/data",
  "status": "Idle",
  "broker_id": null,
  "create_time": 1640995200,
  "update_time": 1640995200
}

消息格式

传输格式

Elasticsearch 连接器将 MQTT 消息转换为 JSON 格式后批量写入 Elasticsearch 索引。

消息结构

json
{
  "offset": 12345,
  "header": [
    {
      "name": "topic",
      "value": "sensor/temperature"
    },
    {
      "name": "qos",
      "value": "1"
    }
  ],
  "key": "sensor_001",
  "data": "eyJ0ZW1wZXJhdHVyZSI6IDI1LjUsICJodW1pZGl0eSI6IDYwfQ==",
  "tags": ["sensor", "temperature"],
  "timestamp": 1640995200,
  "crc_num": 1234567890
}

字段说明

字段名类型说明
offsetNumber消息偏移量
headerArray消息头信息数组
keyString消息键值
dataString消息内容(Base64 编码)
tagsArray消息标签数组
timestampNumber消息时间戳(秒)
crc_numNumber消息 CRC 校验值

使用 robust-ctl 创建 Elasticsearch 连接器

基本语法

使用 robust-ctl 命令行工具可以方便地创建和管理 Elasticsearch 连接器:

bash
robust-ctl mqtt connector create \
  --connector-name <连接器名> \
  --connector-type <连接器类> \
  --config <> \
  --topic-id <主题ID>

创建 Elasticsearch 连接器

1. 基本创建命令

bash
# 创建 Elasticsearch 连接器(无认证)
robust-ctl mqtt connector create \
  --connector-name "elasticsearch_connector_01" \
  --connector-type "Elasticsearch" \
  --config '{"url": "http://localhost:9200", "index": "mqtt_messages"}' \
  --topic-id "sensor/data"

2. 参数说明

参数说明示例值
--connector-name连接器名称,必须唯一elasticsearch_connector_01
--connector-type连接器类型,固定为 ElasticsearchElasticsearch
--configJSON 格式的配置信息{"url": "http://localhost:9200", "index": "mqtt_messages"}
--topic-id要监听的 MQTT 主题sensor/data

3. 配置示例

无认证连接

bash
robust-ctl mqtt connector create \
  --connector-name "es_no_auth" \
  --connector-type "Elasticsearch" \
  --config '{"url": "http://localhost:9200", "index": "mqtt_logs"}' \
  --topic-id "logs/#"

Basic 认证连接

bash
robust-ctl mqtt connector create \
  --connector-name "es_basic_auth" \
  --connector-type "Elasticsearch" \
  --config '{"url": "http://localhost:9200", "index": "mqtt_messages", "auth_type": "basic", "username": "elastic", "password": "password123"}' \
  --topic-id "sensor/+/data"

API Key 认证连接(带 TLS)

bash
robust-ctl mqtt connector create \
  --connector-name "es_secure" \
  --connector-type "Elasticsearch" \
  --config '{"url": "https://es.example.com:9200", "index": "mqtt_secure", "auth_type": "apikey", "api_key": "your_key", "enable_tls": true, "timeout_secs": 60, "max_retries": 5}' \
  --topic-id "secure/data"

管理连接器

1. 列出所有连接器

bash
# 列出所有连接器
robust-ctl mqtt connector list

# 列出指定名称的连接器
robust-ctl mqtt connector list --connector-name "elasticsearch_connector_01"

2. 删除连接器

bash
# 删除指定连接器
robust-ctl mqtt connector delete --connector-name "elasticsearch_connector_01"

完整操作示例

场景:创建 IoT 数据搜索分析系统

bash
# 1. 创建传感器数据 Elasticsearch 连接器
robust-ctl mqtt connector create \
  --connector-name "iot_sensor_es" \
  --connector-type "Elasticsearch" \
  --config '{"url": "http://localhost:9200", "index": "iot_sensors", "auth_type": "basic", "username": "elastic", "password": "elastic123"}' \
  --topic-id "iot/sensors/+/data"

# 2. 创建设备状态 Elasticsearch 连接器
robust-ctl mqtt connector create \
  --connector-name "device_status_es" \
  --connector-type "Elasticsearch" \
  --config '{"url": "http://localhost:9200", "index": "device_status", "auth_type": "basic", "username": "elastic", "password": "elastic123"}' \
  --topic-id "iot/devices/+/status"

# 3. 创建告警消息 Elasticsearch 连接器
robust-ctl mqtt connector create \
  --connector-name "alarm_es" \
  --connector-type "Elasticsearch" \
  --config '{"url": "http://localhost:9200", "index": "alarms", "auth_type": "basic", "username": "elastic", "password": "elastic123", "timeout_secs": 60}' \
  --topic-id "iot/alarms/#"

# 4. 查看创建的连接器
robust-ctl mqtt connector list

# 5. 测试连接器(发布测试消息)
robust-ctl mqtt publish \
  --username "test_user" \
  --password "test_pass" \
  --topic "iot/sensors/temp_001/data" \
  --qos 1 \
  --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}'

Elasticsearch 查询示例

连接器将数据写入 Elasticsearch 后,可以使用 Elasticsearch 的 REST API 或 Kibana 进行查询和分析。

1. 查询所有消息

bash
curl -X GET "localhost:9200/mqtt_messages/_search?pretty" \
  -H 'Content-Type: application/json' \
  -d '{
    "query": {
      "match_all": {}
    }
  }'

2. 按时间范围查询

bash
curl -X GET "localhost:9200/mqtt_messages/_search?pretty" \
  -H 'Content-Type: application/json' \
  -d '{
    "query": {
      "range": {
        "timestamp": {
          "gte": 1640995200,
          "lte": 1641081600
        }
      }
    }
  }'

3. 全文搜索

bash
curl -X GET "localhost:9200/mqtt_messages/_search?pretty" \
  -H 'Content-Type: application/json' \
  -d '{
    "query": {
      "match": {
        "data": "temperature"
      }
    }
  }'

性能优化建议

1. 批量写入

  • 连接器默认使用批量写入(Bulk API)提高性能
  • 建议根据消息量调整批量大小

2. 索引设置

  • 根据数据量合理设置索引分片数
  • 使用索引生命周期管理(ILM)自动管理旧数据
  • 建议使用时间序列索引模式(如:mqtt_messages-2023.12.15

3. 连接优化

  • 对于高吞吐量场景,适当增加 timeout_secsmax_retries
  • 使用连接池减少连接开销
  • 考虑使用 Elasticsearch 集群提高可用性

4. 安全建议

  • 生产环境建议启用 TLS 加密
  • 使用 API Key 认证替代 Basic 认证
  • 定期轮换 API 密钥
  • 使用最小权限原则配置 Elasticsearch 用户

监控和故障排查

1. 查看连接器状态

bash
robust-ctl mqtt connector list --connector-name "elasticsearch_connector_01"

2. 查看 Elasticsearch 索引状态

bash
curl -X GET "localhost:9200/_cat/indices/mqtt_messages?v"

3. 常见问题

问题 1:连接失败

  • 检查 Elasticsearch 服务是否运行
  • 验证 URL 地址是否正确
  • 检查网络连接和防火墙设置

问题 2:认证失败

  • 验证用户名和密码是否正确
  • 检查 API Key 是否有效
  • 确认用户权限是否足够

问题 3:写入超时

  • 增加 timeout_secs 配置
  • 检查 Elasticsearch 服务负载
  • 考虑增加 Elasticsearch 集群资源

总结

Elasticsearch 连接器是 RobustMQ 数据集成系统的强大组件,提供了与 Elasticsearch 搜索引擎的无缝集成能力。通过合理的配置和使用,可以实现:

  • 实时搜索:MQTT 消息实时写入 Elasticsearch,支持全文搜索和聚合分析
  • 数据可视化:结合 Kibana 实现数据的可视化展示和监控
  • 灵活认证:支持多种认证方式,满足不同的安全需求
  • 高性能:批量写入和连接优化确保高吞吐量场景下的性能
  • 易于集成:简单的配置即可将 MQTT 数据桥接到 Elasticsearch

该连接器充分利用了 Rust 语言的性能优势和 Elasticsearch 的搜索能力,是构建现代化 IoT 数据分析和监控系统的理想选择。