Skip to content

RobustMQ 规则引擎设计思考

背景与问题

RobustMQ 支持 MQTT、Kafka 多协议,未来还会接入更多协议。每种协议都有数据进入和流出的节点,这些节点上往往需要做数据清洗和转换:

  • MQTT 设备上报的原始字节流,需要解析成结构化数据再路由
  • Kafka 消息进入时,需要字段过滤、格式转换后再写入存储或转发给消费者
  • Connector 场景(类似 Kafka Connect)中,Source 和 Sink 两端的数据格式通常不一致,需要在中间做映射和转换

现有的解法要么是让用户自己写代码接一套处理框架(Flink、Kafka Streams),要么是用各家 Broker 自己的规则引擎——但这些引擎都是协议绑定的,EMQX 的规则只能用在 EMQX 上,Kafka Streams 只能处理 Kafka 消息。

RobustMQ 的思路是:在内核层面内置一套统一的无状态规则引擎,覆盖所有协议通道上的数据处理需求。不做 Flink 那样的通用流计算框架,只做无状态的数据清洗和转换——这件事在 80% 的场景里已经足够。

设计边界

明确做什么、不做什么:

  • 无状态的逐条消息处理:过滤、字段变换、类型转换、格式转换、路由
  • 覆盖 MQTT、Kafka 及后续协议的所有数据进出节点
  • 覆盖 Connector 场景(Source → Transform → Sink)的中间转换层
  • 轻量,可在边缘设备上运行,单二进制无外部依赖

不做

  • 有状态的流计算(窗口聚合、Join、CEP)——这是 Flink 的领域
  • 跨消息的关联和累积——留给外部处理框架

这个边界保证了引擎的复杂度可控,同时能覆盖大多数实际场景。

整体架构

规则引擎分三层:

  MQTT Channel    Kafka Channel    Future Protocols
       │                │                 │
       └────────────────┴─────────────────┘

              ┌─────────▼──────────┐
              │   规则引擎(挂载点)  │  ← 每个协议通道都可挂载规则
              └─────────┬──────────┘

       ┌────────────────┼─────────────────┐
       ▼                ▼                 ▼
  MQTT Sink       Kafka Sink        Connector Sink
┌─────────────────────────────────────┐
│  AI 层(自然语言 → 规则生成)          │
│  MCP Server / Chat / Dashboard      │
├─────────────────────────────────────┤
│  规则规范层(Schema)                 │
│  算子集合 + 组合方式 + JSON 描述      │
├─────────────────────────────────────┤
│  执行引擎层(Runtime)                │
│  Rust 实现 + CEL 表达式 + 函数注册表  │
└─────────────────────────────────────┘

执行引擎层:纯计算、无状态。每条消息进来,按顺序执行一组算子,输出变换后的消息或丢弃。不依赖外部服务,可编译为单一二进制。

规则规范层:定义系统能做什么,是执行引擎的输入协议。规则是算子的有序列表,每个算子有明确的类型、参数和语义。用 JSON 表达,可通过 Dashboard 配置或 API 下发。

AI 层:用自然语言描述需求,AI 将其翻译为规则 JSON,用户确认后下发执行。AI 不参与执行,只负责生成。

执行引擎层(Runtime)

核心模型

执行引擎的核心模型极简:一条消息进来,经过一组有序算子,输出一条(或零条)消息出去。

Message In


┌──────────┐
│ Op 1     │  filter: payload.temp > 0
├──────────┤
│ Op 2     │  set: payload.temp_f = payload.temp * 1.8 + 32
├──────────┤
│ Op 3     │  delete: payload.raw_bytes
├──────────┤
│ Op 4     │  route: topic = "processed/${device_id}"
└──────────┘


Message Out(或 Drop)

任意一个 filter 算子返回 false,消息直接丢弃,后续算子不再执行。这个模型是纯函数式的,没有副作用,每条消息完全独立处理。

表达式引擎:CEL 的角色

CEL 不是 Runtime 的全部,只负责动态表达式求值这一件事,具体是两个算子:filter 的过滤条件,和 set 的字段计算表达式。

# filter 的 expr 字段
payload.temp > 0 && payload.device_id != ""

# set 的 expr 字段(计算新字段值)
payload.temp * 1.8 + 32
string(payload.ts) + "_" + payload.device_id

renamedeletekeep_onlybytes_decodeunit_convertcsv_decode 这些算子和 CEL 完全无关,都是 Rust 直接实现的固定逻辑。

选择 CEL(Common Expression Language) 的理由:Google 开源,Rust 实现为 cel-rust,被 Kubernetes、Envoy、Firebase 广泛采用,类型安全、沙箱隔离(不能执行任意代码),且 AI 对 CEL 语法的生成质量高(训练数据丰富)。

消息模型

引擎内部统一使用结构化的消息对象,不依赖具体协议格式:

Message {
    topic:    String,          // 消息路由键
    payload:  Map<String, Any>, // 结构化内容,已解码
    headers:  Map<String, String>, // 协议头部
    metadata: Map<String, Any>, // 来源协议、时间戳等元信息
}

各协议层在消息进入规则引擎前负责将原始格式转换为此结构,规则引擎不感知协议细节。这也是多协议共用同一套算子的基础。

算子实现分类

Runtime 的主体工作是实现 50+ 个算子,但不是每个都需要从零写。按实现方式分三类:

直接用现有 crate,几乎不用写代码

算子依赖 crate
JSON 字段访问、多级路径提取serde_json.pointer("/a/b/c")
JSONPath 查询jsonpath-rustserde_json_path
jq 语法jaq(纯 Rust 实现)
正则匹配/提取regex
Base64base64
Gzip/压缩flate2
Protobufprost
Avroapache-avro
CSVcsv
MessagePackrmp-serde
时间处理chrono
哈希/HMACsha2 + hmac

薄薄包一层,写少量胶水代码

  • 字符串操作(trim/split/replace/upper/lower)→ Rust 标准库本身支持,包装成算子接口
  • 类型转换(to_int/to_float)→ 标准库 parse(),加错误处理
  • 数学函数 → 标准库 f64 方法

需要自己实现逻辑的

  • unit_convert — 维护一个工程单位映射表和换算公式
  • bytes_decode — 按 int16_be / float32_le 等格式解析原始字节流
  • normalize — 一行公式,但需要自己封装
  • threshold_alert — 超阈值注入标签的逻辑
  • template_render${field} 语法替换
  • moving_avg — 带简单状态的滑动平均(边缘场景)
  • 框架层:Rule 匹配、算子链调度、on_error 分支处理

真正需要从零写逻辑的算子不多,大头是边缘专属那几个和框架层本身。

性能考虑

  • 规则在首次加载时编译(CEL 表达式预编译、函数注册),运行时只执行已编译的指令,无解析开销
  • 算子链是 Vec<Box<dyn Operator>>,顺序执行,无动态调度以外的额外开销
  • 目标是单核处理能力不低于协议层本身的吞吐上限,规则引擎不应成为瓶颈

规则规范层(Schema)

规则结构

一条 Rule 的完整结构由四部分组成:匹配条件、N 个算子、输出目标、失败处理策略。

json
{
  "id": "rule-001",
  "name": "温度单位转换",
  "match": {
    "topics": ["devices/+/telemetry"],
    "protocols": ["mqtt"]
  },
  "ops": [
    { "type": "filter",          "expr": "payload.temp != null" },
    { "type": "set",             "field": "payload.temp_f", "expr": "payload.temp * 1.8 + 32" },
    { "type": "unit_convert",    "field": "payload.pressure", "from": "Pa", "to": "bar" },
    { "type": "delete",          "fields": ["payload.raw"] },
    { "type": "threshold_alert", "field": "payload.temp_f", "max": 176, "label": "overheat" }
  ],
  "sink": {
    "type": "topic",
    "target": "processed/${metadata.device_id}"
  },
  "on_error": {
    "strategy": "deadletter",
    "topic": "$rule/rule-001/deadletter"
  }
}

ops 是有序的算子链,算子数量没有限制,前一个算子的输出是后一个算子的输入。任意一个 filter 返回 false,整条链终止,消息被丢弃,不触发 on_error(filter 失败是正常的过滤行为,不是错误)。

sink 输出目标支持多种类型:

topic     → 发到另一个 Topic(内部流转)
drop      → 静默丢弃(只做过滤,不需要输出)
webhook   → 发到外部 HTTP 接口
storage   → 写入持久化存储(后续扩展)

on_error 失败处理,处理算子执行失败的情况(字段不存在、类型转换失败、字节格式错误等):

drop        → 静默丢弃,不输出(默认)
passthrough → 跳过失败的算子,消息继续往后走
deadletter  → 发到专用 dead letter topic,保留现场,可事后分析和重投
abort       → 终止并记录错误日志

生产环境推荐 deadletter:失败的消息不丢失,运维可查看原因,修复规则后重新投递。

算子的类型系统

每个算子有精确的类型定义,参数有明确的类型约束。这是 AI 能可靠生成规则的前提——模糊的算子定义会导致 AI 生成的规则语义不确定。

Operator
  ├── Filter    { expr: CelExpr }
  ├── Set       { field: FieldPath, expr: CelExpr }
  ├── Delete    { fields: Vec<FieldPath> }
  ├── Rename    { from: FieldPath, to: FieldPath }
  ├── KeepOnly  { fields: Vec<FieldPath> }
  ├── Flatten   { prefix: Option<String> }
  ├── Decode    { field: FieldPath, codec: Codec }   // json/protobuf/avro/csv/bytes
  ├── Encode    { field: FieldPath, codec: Codec }
  ├── Convert   { field: FieldPath, from: Unit, to: Unit }
  ├── Normalize { field: FieldPath, min: f64, max: f64 }
  ├── Alert     { field: FieldPath, min: Option<f64>, max: Option<f64>, label: String }
  └── Route     { target: Template }

多条 Rule 的执行模型

多条 Rule 之间是独立并行的,不是串联的。同一条消息可以同时命中多条 Rule,每条 Rule 独立拿到原始消息的副本,独立执行,独立输出,互不影响。

原始消息 ──┬──▶ Rule-001 算子链 ──▶ 输出到 processed/...

           └──▶ Rule-002 算子链 ──▶ 输出到 archive/...

Rule-001 的输出不会流入 Rule-002。如果需要多步处理,做法是在同一条 Rule 内部串联多个算子,而不是多条 Rule 首尾相接。

如果确实需要"Rule A 的输出作为 Rule B 的输入",可以让 Rule A 输出到一个中间 Topic,Rule B 匹配该 Topic——通过 Topic 路由模拟链式处理。但大多数场景用一条 Rule 内的多算子链就足够了。

Schema 的版本管理

规则 Schema 需要版本控制。引擎升级引入新算子时,旧规则仍然可以正确执行;废弃算子时有明确的迁移路径。Schema 版本和引擎版本解耦,规则文件可以在不同版本的引擎间迁移。

规则的存储与分发

规则通过 Meta Service 存储,和现有集群机制复用。规则变更时,Meta Service 通知所有 Broker 节点热更新,不需要重启,不影响正在处理的消息。


AI 层(自然语言 → 规则生成)

核心问题

AI 层要解决的问题是:把用户的自然语言描述,可靠地转换为符合 Schema 的规则 JSON。

"可靠"是关键词。AI 生成的规则不能有歧义,不能生成引擎不认识的算子,不能生成参数类型错误的规则。

实现思路

Prompt 工程 + Schema 约束

将完整的算子 Schema(每个算子的类型、参数、语义描述)注入到 Prompt 中,AI 在生成规则时只能使用已定义的算子,参数类型由 Schema 约束。生成完成后,在下发执行前做一次 Schema 校验,不合法的规则不会进入引擎。

这样的好处是:AI 的幻觉问题被 Schema 校验兜底,即使生成了错误的算子名或参数类型,校验会拦截并返回错误,不会产生运行时异常。

上下文增强

用户描述时往往不会指定字段的完整路径,只说"温度字段"。AI 层需要结合当前 Topic 的消息样本(用户可以上传一条示例消息)来推断实际字段名,生成精确的 payload.temperature 而不是模糊的 temperature

交互确认

AI 生成规则后,展示人类可读的描述(不是 JSON)给用户确认:

将生成以下规则:
  1. 过滤:丢弃 temp 字段为空的消息
  2. 转换:将 temp(摄氏度)转换为华氏度,写入 temp_f 字段
  3. 告警:temp_f 超过 176°F 时在消息中标记 alert=overheat
  4. 路由:输出到 processed/${device_id}

确认生效?[是 / 修改]

用户确认后,后台将规则 JSON 写入 Meta Service。用户全程不需要看到 JSON。

MCP Server 接口

MCP Server 暴露的核心工具:

Tool说明
create_rule从自然语言描述创建规则
list_rules查询当前生效的规则列表
update_rule修改规则(支持自然语言描述变更)
delete_rule删除规则
test_rule用一条示例消息测试规则,返回输出结果
explain_rule将规则 JSON 解释为人类可读描述

test_rule 是其中最有价值的工具:在规则生效前,用一条真实消息跑一遍,看输出是否符合预期,降低配置错误进生产的风险。

无状态算子集合

以下是计划实现的完整算子集合,全部无状态,每条消息独立处理。

类别算子说明
过滤与条件filter条件过滤,CEL 表达式,支持 AND/OR/NOT/比较/IN/BETWEEN
coalesce返回第一个非空值
case_when条件分支,按条件输出不同字段值
字段操作select选取指定字段
rename字段重命名
set新增或覆盖字段,值支持 CEL 表达式
delete删除字段
keep_only只保留指定字段,其余全删
flatten嵌套 JSON 展平为顶层字段
nest将多个字段打包为嵌套结构
merge合并两个 Map 对象
类型转换to_int / to_float / to_string / to_bool基本类型互转
float(x, decimals)浮点数保留指定小数位
字符串trim / upper / lower去空白、大小写转换
split / concat / replace分割、拼接、替换
regex_match / regex_extract正则匹配与捕获组提取
starts_with / ends_with / contains包含判断
url_encode / url_decodeURL 编解码
template_render字符串模板渲染,支持 ${field} 语法
JSONjson_decode / json_encodeJSON 序列化/反序列化
map_get / map_put / map_keys / map_valuesMap 字段读写
json_pathJSONPath 查询,$.store.book[*].author
jqjq 语法完整支持,复杂 JSON 变换
数组map_array / filter_array对数组每个元素应用函数或条件过滤
flatten_array / sort_array展平嵌套数组、排序
first / last / nth取首尾或第 N 个元素
zip两个数组合并为对象数组
数学abs / ceil / floor / round / sqrt基础数学运算
log / log10 / exp / power指数与对数
clamp(x, min, max)区间截断,超出范围取边界值
normalize(x, min, max)归一化到 [0, 1],AI 训练数据预处理 ⭐
unit_convert(x, from, to)工程单位换算,内置单位库(℃/℉、Pa/bar、mV/V 等)⭐
时间now_timestamp / now_rfc3339当前时间戳/RFC3339 字符串
unix_ts_to_rfc3339 / rfc3339_to_unix_ts时间戳与 RFC3339 互转
format_date按格式字符串格式化日期
date_diff计算两个时间点的差值
timezone_convert时区转换
编解码base64_encode / base64_decodeBase64
bin2hexstr / hexstr2binHex 编解码
gzip / gunzip压缩/解压
protobuf_decode / protobuf_encodeProtobuf,需提供 schema
avro_decode / avro_encodeAvro
msgpack_decode / msgpack_encodeMessagePack
csv_decode / csv_encodeCSV 字符串与结构化数据互转 ⭐
bytes_decode(payload, format)按格式解析原始字节流,支持 int16_befloat32_le 等工业数据格式 ⭐
哈希/安全md5 / sha256 / sha512 / hmac_sha256哈希与 HMAC
aes_encrypt / aes_decrypt对称加密
uuid4生成 UUID
边缘专属threshold_alert(x, min, max)超阈值自动在消息中注入告警标签 ⭐
moving_avg(field, n)N 点移动平均,用于边缘侧简单降噪 ⭐
deduplicate(field, ttl_ms)按字段值去重,基于本地 TTL 滑动窗口 ⭐

标注 ⭐ 的是 EMQX 规则引擎不具备、面向边缘和 AI 场景的差异化算子。

用户接入方式

核心设计原则:用户尽量不需要手动配置算子。

规则的主要生成方式是自然语言,手动配置只是兜底选项。

1. 自然语言生成(主路径)

无论是 Dashboard 还是外部 AI 工具,用户都通过自然语言描述需求,系统生成规则后展示给用户确认,确认后直接生效。Dashboard 内置对话框,不需要跳转到外部工具。

用户输入:把 payload.temp 从摄氏度转成华氏度,超过 80 度加 alert=true 字段

AI 生成规则 JSON(展示给用户预览)

用户确认 → 规则写入并生效

2. MCP Server(AI 工具直连)

RobustMQ 将规则引擎管理接口暴露为 MCP Server,任何支持 MCP 的 AI 工具(Claude、Cursor、各类 AI Agent)都可以直接操作。用户在习惯的 AI 工具里描述需求,规则通过 MCP 写入 RobustMQ,不需要打开 Dashboard。

这是面向开发者的场景:在开发阶段直接在 IDE 里配置数据处理规则,和写代码在同一个工作流里。

3. API 直接下发(兜底)

通过 HTTP API 直接提交规则 JSON,适合需要版本管理或 CI/CD 自动化的场景。手动编写规则 JSON 是最后的兜底方式,正常使用不应该需要这一步。


三种方式底层执行同一套算子引擎。自然语言能覆盖的场景越多,用户需要手动接触算子细节的情况就越少——这也是算子集合需要语义精确、覆盖完整的原因:它是 AI 生成规则的基础,而不是用户直接操作的界面。

与现有方案的对比

EMQX 规则引擎Kafka StreamsFlinkRobustMQ 规则引擎
部署方式与 Broker 绑定独立 JVM 服务独立集群内核内置,边缘/云端通用
表达能力SQL 方言Java DSLJava/SQL算子集合 + CEL
工业数据支持内置 bytes_decode、unit_convert
AI 友好MCP Server + 标准化算子
边缘部署不支持不支持不支持单二进制,无外部依赖
适合场景MQTT 简单过滤Kafka 流计算复杂有状态计算IoT 数据清洗、边缘预处理

RobustMQ 规则引擎的定位不是替代 Flink,而是覆盖 Flink 触及不到的场景:边缘设备上的轻量数据清洗、IoT 数据进 Broker 前的预处理、Connector 中间层的格式映射、不想引入外部框架的简单转换需求。

换句话说:用户在 RobustMQ 内部就能解决 80% 的数据清洗需求,不需要为此单独部署一套 Flink 集群。

值得特别说明的是:EMQX 的 Flow Designer 每条规则最多只支持一个 Function 节点 + 一个 Filter 节点,多步处理要么写复杂 SQL,要么串联多条规则。RobustMQ 的算子链支持任意 N 个算子顺序组合,表达能力更强,配置更直观。


总结

我们要做什么

在 RobustMQ 内核层面内置一套统一的无状态规则引擎,覆盖 MQTT、Kafka 及后续所有协议通道上的数据处理需求。目标是让用户在不引入任何外部框架的情况下,完成 80% 的数据清洗和转换工作。

设想是什么样子

用户不需要理解规则引擎的语法和算子细节。无论是在 Dashboard 还是外部 AI 工具里,用自然语言描述需求,系统生成规则,确认后直接生效:

"把温度从摄氏度转成华氏度,超过 80 度打告警标签,发到 alert/${device_id}"
    ↓ AI 生成规则
    ↓ 用户确认
    ↓ 规则生效,每条消息自动处理

规则引擎通过 MCP Server 暴露标准接口,AI 工具(Claude、Cursor、各类 Agent)可以直接操作,配置规则和写代码在同一个工作流里完成。

技术实现

三层架构

  • 执行引擎(Runtime):Rust 实现,纯无状态,每条消息独立处理。算子链顺序执行,前一个算子的输出是后一个的输入。CEL(cel-rust)负责 filterset 中的动态表达式求值,其余算子通过内置函数注册表实现。
  • 规则规范(Schema):一条 Rule = match(匹配条件)+ N 个算子 + sink(输出目标)+ on_error(失败策略)。JSON 描述,启动时编译,热更新不重启。多条 Rule 独立并行执行,不互相影响。
  • AI 层:Prompt + Schema 约束保证生成质量,Schema 校验兜底 AI 幻觉问题。MCP Server 暴露 create_ruletest_ruleexplain_rule 等工具,test_rule 是其中最关键的——用真实消息验证规则后再生效。

算子集合

  • 大部分算子依托现有 Rust crate(serde_jsonregexprostcsvflate2chrono 等),实现成本低
  • 需要自己实现的核心算子:unit_convert(工程单位换算)、bytes_decode(工业字节解析)、normalize(数据归一化)、threshold_alert(边缘告警标注)
  • 框架层:Rule 匹配、算子链调度、on_error 分支处理

与 EMQX 的本质差异

维度EMQX FlowRobustMQ
每条规则处理步骤最多 1 Function + 1 FilterN 个算子任意组合
协议绑定仅 MQTTMQTT + Kafka + 后续协议
工业数据支持bytes_decode / unit_convert
AI 接入自然语言生成 + MCP Server
边缘部署不支持单二进制,无外部依赖

规则引擎不是 RobustMQ 的附属功能,而是让它从"消息队列"变成"数据处理平台"的关键一步。

项目地址:https://github.com/robustmq/robustmq