RobustMQ 规则引擎设计思考
背景与问题
RobustMQ 支持 MQTT、Kafka 多协议,未来还会接入更多协议。每种协议都有数据进入和流出的节点,这些节点上往往需要做数据清洗和转换:
- MQTT 设备上报的原始字节流,需要解析成结构化数据再路由
- Kafka 消息进入时,需要字段过滤、格式转换后再写入存储或转发给消费者
- Connector 场景(类似 Kafka Connect)中,Source 和 Sink 两端的数据格式通常不一致,需要在中间做映射和转换
现有的解法要么是让用户自己写代码接一套处理框架(Flink、Kafka Streams),要么是用各家 Broker 自己的规则引擎——但这些引擎都是协议绑定的,EMQX 的规则只能用在 EMQX 上,Kafka Streams 只能处理 Kafka 消息。
RobustMQ 的思路是:在内核层面内置一套统一的无状态规则引擎,覆盖所有协议通道上的数据处理需求。不做 Flink 那样的通用流计算框架,只做无状态的数据清洗和转换——这件事在 80% 的场景里已经足够。
设计边界
明确做什么、不做什么:
做:
- 无状态的逐条消息处理:过滤、字段变换、类型转换、格式转换、路由
- 覆盖 MQTT、Kafka 及后续协议的所有数据进出节点
- 覆盖 Connector 场景(Source → Transform → Sink)的中间转换层
- 轻量,可在边缘设备上运行,单二进制无外部依赖
不做:
- 有状态的流计算(窗口聚合、Join、CEP)——这是 Flink 的领域
- 跨消息的关联和累积——留给外部处理框架
这个边界保证了引擎的复杂度可控,同时能覆盖大多数实际场景。
整体架构
规则引擎分三层:
MQTT Channel Kafka Channel Future Protocols
│ │ │
└────────────────┴─────────────────┘
│
┌─────────▼──────────┐
│ 规则引擎(挂载点) │ ← 每个协议通道都可挂载规则
└─────────┬──────────┘
│
┌────────────────┼─────────────────┐
▼ ▼ ▼
MQTT Sink Kafka Sink Connector Sink┌─────────────────────────────────────┐
│ AI 层(自然语言 → 规则生成) │
│ MCP Server / Chat / Dashboard │
├─────────────────────────────────────┤
│ 规则规范层(Schema) │
│ 算子集合 + 组合方式 + JSON 描述 │
├─────────────────────────────────────┤
│ 执行引擎层(Runtime) │
│ Rust 实现 + CEL 表达式 + 函数注册表 │
└─────────────────────────────────────┘执行引擎层:纯计算、无状态。每条消息进来,按顺序执行一组算子,输出变换后的消息或丢弃。不依赖外部服务,可编译为单一二进制。
规则规范层:定义系统能做什么,是执行引擎的输入协议。规则是算子的有序列表,每个算子有明确的类型、参数和语义。用 JSON 表达,可通过 Dashboard 配置或 API 下发。
AI 层:用自然语言描述需求,AI 将其翻译为规则 JSON,用户确认后下发执行。AI 不参与执行,只负责生成。
执行引擎层(Runtime)
核心模型
执行引擎的核心模型极简:一条消息进来,经过一组有序算子,输出一条(或零条)消息出去。
Message In
│
▼
┌──────────┐
│ Op 1 │ filter: payload.temp > 0
├──────────┤
│ Op 2 │ set: payload.temp_f = payload.temp * 1.8 + 32
├──────────┤
│ Op 3 │ delete: payload.raw_bytes
├──────────┤
│ Op 4 │ route: topic = "processed/${device_id}"
└──────────┘
│
▼
Message Out(或 Drop)任意一个 filter 算子返回 false,消息直接丢弃,后续算子不再执行。这个模型是纯函数式的,没有副作用,每条消息完全独立处理。
表达式引擎:CEL 的角色
CEL 不是 Runtime 的全部,只负责动态表达式求值这一件事,具体是两个算子:filter 的过滤条件,和 set 的字段计算表达式。
# filter 的 expr 字段
payload.temp > 0 && payload.device_id != ""
# set 的 expr 字段(计算新字段值)
payload.temp * 1.8 + 32
string(payload.ts) + "_" + payload.device_idrename、delete、keep_only、bytes_decode、unit_convert、csv_decode 这些算子和 CEL 完全无关,都是 Rust 直接实现的固定逻辑。
选择 CEL(Common Expression Language) 的理由:Google 开源,Rust 实现为 cel-rust,被 Kubernetes、Envoy、Firebase 广泛采用,类型安全、沙箱隔离(不能执行任意代码),且 AI 对 CEL 语法的生成质量高(训练数据丰富)。
消息模型
引擎内部统一使用结构化的消息对象,不依赖具体协议格式:
Message {
topic: String, // 消息路由键
payload: Map<String, Any>, // 结构化内容,已解码
headers: Map<String, String>, // 协议头部
metadata: Map<String, Any>, // 来源协议、时间戳等元信息
}各协议层在消息进入规则引擎前负责将原始格式转换为此结构,规则引擎不感知协议细节。这也是多协议共用同一套算子的基础。
算子实现分类
Runtime 的主体工作是实现 50+ 个算子,但不是每个都需要从零写。按实现方式分三类:
直接用现有 crate,几乎不用写代码
| 算子 | 依赖 crate |
|---|---|
| JSON 字段访问、多级路径提取 | serde_json(.pointer("/a/b/c")) |
| JSONPath 查询 | jsonpath-rust 或 serde_json_path |
| jq 语法 | jaq(纯 Rust 实现) |
| 正则匹配/提取 | regex |
| Base64 | base64 |
| Gzip/压缩 | flate2 |
| Protobuf | prost |
| Avro | apache-avro |
| CSV | csv |
| MessagePack | rmp-serde |
| 时间处理 | chrono |
| 哈希/HMAC | sha2 + hmac |
薄薄包一层,写少量胶水代码
- 字符串操作(trim/split/replace/upper/lower)→ Rust 标准库本身支持,包装成算子接口
- 类型转换(to_int/to_float)→ 标准库
parse(),加错误处理 - 数学函数 → 标准库
f64方法
需要自己实现逻辑的
unit_convert— 维护一个工程单位映射表和换算公式bytes_decode— 按 int16_be / float32_le 等格式解析原始字节流normalize— 一行公式,但需要自己封装threshold_alert— 超阈值注入标签的逻辑template_render—${field}语法替换moving_avg— 带简单状态的滑动平均(边缘场景)- 框架层:Rule 匹配、算子链调度、on_error 分支处理
真正需要从零写逻辑的算子不多,大头是边缘专属那几个和框架层本身。
性能考虑
- 规则在首次加载时编译(CEL 表达式预编译、函数注册),运行时只执行已编译的指令,无解析开销
- 算子链是
Vec<Box<dyn Operator>>,顺序执行,无动态调度以外的额外开销 - 目标是单核处理能力不低于协议层本身的吞吐上限,规则引擎不应成为瓶颈
规则规范层(Schema)
规则结构
一条 Rule 的完整结构由四部分组成:匹配条件、N 个算子、输出目标、失败处理策略。
{
"id": "rule-001",
"name": "温度单位转换",
"match": {
"topics": ["devices/+/telemetry"],
"protocols": ["mqtt"]
},
"ops": [
{ "type": "filter", "expr": "payload.temp != null" },
{ "type": "set", "field": "payload.temp_f", "expr": "payload.temp * 1.8 + 32" },
{ "type": "unit_convert", "field": "payload.pressure", "from": "Pa", "to": "bar" },
{ "type": "delete", "fields": ["payload.raw"] },
{ "type": "threshold_alert", "field": "payload.temp_f", "max": 176, "label": "overheat" }
],
"sink": {
"type": "topic",
"target": "processed/${metadata.device_id}"
},
"on_error": {
"strategy": "deadletter",
"topic": "$rule/rule-001/deadletter"
}
}ops 是有序的算子链,算子数量没有限制,前一个算子的输出是后一个算子的输入。任意一个 filter 返回 false,整条链终止,消息被丢弃,不触发 on_error(filter 失败是正常的过滤行为,不是错误)。
sink 输出目标支持多种类型:
topic → 发到另一个 Topic(内部流转)
drop → 静默丢弃(只做过滤,不需要输出)
webhook → 发到外部 HTTP 接口
storage → 写入持久化存储(后续扩展)on_error 失败处理,处理算子执行失败的情况(字段不存在、类型转换失败、字节格式错误等):
drop → 静默丢弃,不输出(默认)
passthrough → 跳过失败的算子,消息继续往后走
deadletter → 发到专用 dead letter topic,保留现场,可事后分析和重投
abort → 终止并记录错误日志生产环境推荐 deadletter:失败的消息不丢失,运维可查看原因,修复规则后重新投递。
算子的类型系统
每个算子有精确的类型定义,参数有明确的类型约束。这是 AI 能可靠生成规则的前提——模糊的算子定义会导致 AI 生成的规则语义不确定。
Operator
├── Filter { expr: CelExpr }
├── Set { field: FieldPath, expr: CelExpr }
├── Delete { fields: Vec<FieldPath> }
├── Rename { from: FieldPath, to: FieldPath }
├── KeepOnly { fields: Vec<FieldPath> }
├── Flatten { prefix: Option<String> }
├── Decode { field: FieldPath, codec: Codec } // json/protobuf/avro/csv/bytes
├── Encode { field: FieldPath, codec: Codec }
├── Convert { field: FieldPath, from: Unit, to: Unit }
├── Normalize { field: FieldPath, min: f64, max: f64 }
├── Alert { field: FieldPath, min: Option<f64>, max: Option<f64>, label: String }
└── Route { target: Template }多条 Rule 的执行模型
多条 Rule 之间是独立并行的,不是串联的。同一条消息可以同时命中多条 Rule,每条 Rule 独立拿到原始消息的副本,独立执行,独立输出,互不影响。
原始消息 ──┬──▶ Rule-001 算子链 ──▶ 输出到 processed/...
│
└──▶ Rule-002 算子链 ──▶ 输出到 archive/...Rule-001 的输出不会流入 Rule-002。如果需要多步处理,做法是在同一条 Rule 内部串联多个算子,而不是多条 Rule 首尾相接。
如果确实需要"Rule A 的输出作为 Rule B 的输入",可以让 Rule A 输出到一个中间 Topic,Rule B 匹配该 Topic——通过 Topic 路由模拟链式处理。但大多数场景用一条 Rule 内的多算子链就足够了。
Schema 的版本管理
规则 Schema 需要版本控制。引擎升级引入新算子时,旧规则仍然可以正确执行;废弃算子时有明确的迁移路径。Schema 版本和引擎版本解耦,规则文件可以在不同版本的引擎间迁移。
规则的存储与分发
规则通过 Meta Service 存储,和现有集群机制复用。规则变更时,Meta Service 通知所有 Broker 节点热更新,不需要重启,不影响正在处理的消息。
AI 层(自然语言 → 规则生成)
核心问题
AI 层要解决的问题是:把用户的自然语言描述,可靠地转换为符合 Schema 的规则 JSON。
"可靠"是关键词。AI 生成的规则不能有歧义,不能生成引擎不认识的算子,不能生成参数类型错误的规则。
实现思路
Prompt 工程 + Schema 约束
将完整的算子 Schema(每个算子的类型、参数、语义描述)注入到 Prompt 中,AI 在生成规则时只能使用已定义的算子,参数类型由 Schema 约束。生成完成后,在下发执行前做一次 Schema 校验,不合法的规则不会进入引擎。
这样的好处是:AI 的幻觉问题被 Schema 校验兜底,即使生成了错误的算子名或参数类型,校验会拦截并返回错误,不会产生运行时异常。
上下文增强
用户描述时往往不会指定字段的完整路径,只说"温度字段"。AI 层需要结合当前 Topic 的消息样本(用户可以上传一条示例消息)来推断实际字段名,生成精确的 payload.temperature 而不是模糊的 temperature。
交互确认
AI 生成规则后,展示人类可读的描述(不是 JSON)给用户确认:
将生成以下规则:
1. 过滤:丢弃 temp 字段为空的消息
2. 转换:将 temp(摄氏度)转换为华氏度,写入 temp_f 字段
3. 告警:temp_f 超过 176°F 时在消息中标记 alert=overheat
4. 路由:输出到 processed/${device_id}
确认生效?[是 / 修改]用户确认后,后台将规则 JSON 写入 Meta Service。用户全程不需要看到 JSON。
MCP Server 接口
MCP Server 暴露的核心工具:
| Tool | 说明 |
|---|---|
create_rule | 从自然语言描述创建规则 |
list_rules | 查询当前生效的规则列表 |
update_rule | 修改规则(支持自然语言描述变更) |
delete_rule | 删除规则 |
test_rule | 用一条示例消息测试规则,返回输出结果 |
explain_rule | 将规则 JSON 解释为人类可读描述 |
test_rule 是其中最有价值的工具:在规则生效前,用一条真实消息跑一遍,看输出是否符合预期,降低配置错误进生产的风险。
无状态算子集合
以下是计划实现的完整算子集合,全部无状态,每条消息独立处理。
| 类别 | 算子 | 说明 |
|---|---|---|
| 过滤与条件 | filter | 条件过滤,CEL 表达式,支持 AND/OR/NOT/比较/IN/BETWEEN |
coalesce | 返回第一个非空值 | |
case_when | 条件分支,按条件输出不同字段值 | |
| 字段操作 | select | 选取指定字段 |
rename | 字段重命名 | |
set | 新增或覆盖字段,值支持 CEL 表达式 | |
delete | 删除字段 | |
keep_only | 只保留指定字段,其余全删 | |
flatten | 嵌套 JSON 展平为顶层字段 | |
nest | 将多个字段打包为嵌套结构 | |
merge | 合并两个 Map 对象 | |
| 类型转换 | to_int / to_float / to_string / to_bool | 基本类型互转 |
float(x, decimals) | 浮点数保留指定小数位 | |
| 字符串 | trim / upper / lower | 去空白、大小写转换 |
split / concat / replace | 分割、拼接、替换 | |
regex_match / regex_extract | 正则匹配与捕获组提取 | |
starts_with / ends_with / contains | 包含判断 | |
url_encode / url_decode | URL 编解码 | |
template_render | 字符串模板渲染,支持 ${field} 语法 | |
| JSON | json_decode / json_encode | JSON 序列化/反序列化 |
map_get / map_put / map_keys / map_values | Map 字段读写 | |
json_path | JSONPath 查询,$.store.book[*].author | |
jq | jq 语法完整支持,复杂 JSON 变换 | |
| 数组 | map_array / filter_array | 对数组每个元素应用函数或条件过滤 |
flatten_array / sort_array | 展平嵌套数组、排序 | |
first / last / nth | 取首尾或第 N 个元素 | |
zip | 两个数组合并为对象数组 | |
| 数学 | abs / ceil / floor / round / sqrt | 基础数学运算 |
log / log10 / exp / power | 指数与对数 | |
clamp(x, min, max) | 区间截断,超出范围取边界值 | |
normalize(x, min, max) | 归一化到 [0, 1],AI 训练数据预处理 ⭐ | |
unit_convert(x, from, to) | 工程单位换算,内置单位库(℃/℉、Pa/bar、mV/V 等)⭐ | |
| 时间 | now_timestamp / now_rfc3339 | 当前时间戳/RFC3339 字符串 |
unix_ts_to_rfc3339 / rfc3339_to_unix_ts | 时间戳与 RFC3339 互转 | |
format_date | 按格式字符串格式化日期 | |
date_diff | 计算两个时间点的差值 | |
timezone_convert | 时区转换 | |
| 编解码 | base64_encode / base64_decode | Base64 |
bin2hexstr / hexstr2bin | Hex 编解码 | |
gzip / gunzip | 压缩/解压 | |
protobuf_decode / protobuf_encode | Protobuf,需提供 schema | |
avro_decode / avro_encode | Avro | |
msgpack_decode / msgpack_encode | MessagePack | |
csv_decode / csv_encode | CSV 字符串与结构化数据互转 ⭐ | |
bytes_decode(payload, format) | 按格式解析原始字节流,支持 int16_be、float32_le 等工业数据格式 ⭐ | |
| 哈希/安全 | md5 / sha256 / sha512 / hmac_sha256 | 哈希与 HMAC |
aes_encrypt / aes_decrypt | 对称加密 | |
uuid4 | 生成 UUID | |
| 边缘专属 | threshold_alert(x, min, max) | 超阈值自动在消息中注入告警标签 ⭐ |
moving_avg(field, n) | N 点移动平均,用于边缘侧简单降噪 ⭐ | |
deduplicate(field, ttl_ms) | 按字段值去重,基于本地 TTL 滑动窗口 ⭐ |
标注 ⭐ 的是 EMQX 规则引擎不具备、面向边缘和 AI 场景的差异化算子。
用户接入方式
核心设计原则:用户尽量不需要手动配置算子。
规则的主要生成方式是自然语言,手动配置只是兜底选项。
1. 自然语言生成(主路径)
无论是 Dashboard 还是外部 AI 工具,用户都通过自然语言描述需求,系统生成规则后展示给用户确认,确认后直接生效。Dashboard 内置对话框,不需要跳转到外部工具。
用户输入:把 payload.temp 从摄氏度转成华氏度,超过 80 度加 alert=true 字段
↓
AI 生成规则 JSON(展示给用户预览)
↓
用户确认 → 规则写入并生效2. MCP Server(AI 工具直连)
RobustMQ 将规则引擎管理接口暴露为 MCP Server,任何支持 MCP 的 AI 工具(Claude、Cursor、各类 AI Agent)都可以直接操作。用户在习惯的 AI 工具里描述需求,规则通过 MCP 写入 RobustMQ,不需要打开 Dashboard。
这是面向开发者的场景:在开发阶段直接在 IDE 里配置数据处理规则,和写代码在同一个工作流里。
3. API 直接下发(兜底)
通过 HTTP API 直接提交规则 JSON,适合需要版本管理或 CI/CD 自动化的场景。手动编写规则 JSON 是最后的兜底方式,正常使用不应该需要这一步。
三种方式底层执行同一套算子引擎。自然语言能覆盖的场景越多,用户需要手动接触算子细节的情况就越少——这也是算子集合需要语义精确、覆盖完整的原因:它是 AI 生成规则的基础,而不是用户直接操作的界面。
与现有方案的对比
| EMQX 规则引擎 | Kafka Streams | Flink | RobustMQ 规则引擎 | |
|---|---|---|---|---|
| 部署方式 | 与 Broker 绑定 | 独立 JVM 服务 | 独立集群 | 内核内置,边缘/云端通用 |
| 表达能力 | SQL 方言 | Java DSL | Java/SQL | 算子集合 + CEL |
| 工业数据支持 | 弱 | 无 | 无 | 内置 bytes_decode、unit_convert |
| AI 友好 | 弱 | 弱 | 弱 | MCP Server + 标准化算子 |
| 边缘部署 | 不支持 | 不支持 | 不支持 | 单二进制,无外部依赖 |
| 适合场景 | MQTT 简单过滤 | Kafka 流计算 | 复杂有状态计算 | IoT 数据清洗、边缘预处理 |
RobustMQ 规则引擎的定位不是替代 Flink,而是覆盖 Flink 触及不到的场景:边缘设备上的轻量数据清洗、IoT 数据进 Broker 前的预处理、Connector 中间层的格式映射、不想引入外部框架的简单转换需求。
换句话说:用户在 RobustMQ 内部就能解决 80% 的数据清洗需求,不需要为此单独部署一套 Flink 集群。
值得特别说明的是:EMQX 的 Flow Designer 每条规则最多只支持一个 Function 节点 + 一个 Filter 节点,多步处理要么写复杂 SQL,要么串联多条规则。RobustMQ 的算子链支持任意 N 个算子顺序组合,表达能力更强,配置更直观。
总结
我们要做什么
在 RobustMQ 内核层面内置一套统一的无状态规则引擎,覆盖 MQTT、Kafka 及后续所有协议通道上的数据处理需求。目标是让用户在不引入任何外部框架的情况下,完成 80% 的数据清洗和转换工作。
设想是什么样子
用户不需要理解规则引擎的语法和算子细节。无论是在 Dashboard 还是外部 AI 工具里,用自然语言描述需求,系统生成规则,确认后直接生效:
"把温度从摄氏度转成华氏度,超过 80 度打告警标签,发到 alert/${device_id}"
↓ AI 生成规则
↓ 用户确认
↓ 规则生效,每条消息自动处理规则引擎通过 MCP Server 暴露标准接口,AI 工具(Claude、Cursor、各类 Agent)可以直接操作,配置规则和写代码在同一个工作流里完成。
技术实现
三层架构:
- 执行引擎(Runtime):Rust 实现,纯无状态,每条消息独立处理。算子链顺序执行,前一个算子的输出是后一个的输入。CEL(
cel-rust)负责filter和set中的动态表达式求值,其余算子通过内置函数注册表实现。 - 规则规范(Schema):一条 Rule = match(匹配条件)+ N 个算子 + sink(输出目标)+ on_error(失败策略)。JSON 描述,启动时编译,热更新不重启。多条 Rule 独立并行执行,不互相影响。
- AI 层:Prompt + Schema 约束保证生成质量,Schema 校验兜底 AI 幻觉问题。MCP Server 暴露
create_rule、test_rule、explain_rule等工具,test_rule是其中最关键的——用真实消息验证规则后再生效。
算子集合:
- 大部分算子依托现有 Rust crate(
serde_json、regex、prost、csv、flate2、chrono等),实现成本低 - 需要自己实现的核心算子:
unit_convert(工程单位换算)、bytes_decode(工业字节解析)、normalize(数据归一化)、threshold_alert(边缘告警标注) - 框架层:Rule 匹配、算子链调度、on_error 分支处理
与 EMQX 的本质差异:
| 维度 | EMQX Flow | RobustMQ |
|---|---|---|
| 每条规则处理步骤 | 最多 1 Function + 1 Filter | N 个算子任意组合 |
| 协议绑定 | 仅 MQTT | MQTT + Kafka + 后续协议 |
| 工业数据支持 | 无 | bytes_decode / unit_convert |
| AI 接入 | 无 | 自然语言生成 + MCP Server |
| 边缘部署 | 不支持 | 单二进制,无外部依赖 |
规则引擎不是 RobustMQ 的附属功能,而是让它从"消息队列"变成"数据处理平台"的关键一步。
