Skip to content

规则引擎总览

RobustMQ 规则引擎用于在 Connector 内做轻量数据处理。
当前目标不是替代 Flink 这类流处理系统,而是覆盖 MQTT 场景里常见的简单加工任务,减少外部系统依赖。

源数据格式

源数据格式简要说明Demo(短)是否支持优先级
JSON Object单条结构化对象{"device":"d1","temp":36.5}
JSON Array多条对象数组[{"device":"d1"},{"device":"d2"}]
JSON Lines每行一个 JSON 对象{"d":"d1"}\n{"d":"d2"}
key=value 行日志键值对文本,按分隔符解析ts=2026-03-03 level=INFO temp=36.5
无 key 行日志固定列顺序文本2026-03-03 INFO d1 36.5
CSV逗号分隔表格文本ts,level,device,temp
纯文本非结构化或半结构化字符串device d1 temp 36.5
bytes(二进制)原始二进制负载0x89 0x50 0x4E 0x47 ...
Protobuf二进制协议,需要 schemaMessage<DeviceData>(binary)
XML标签化结构文本<msg><device>d1</device></msg>

算子列表

过滤与条件

算子作用典型场景完成
filterCEL 表达式条件过滤,不满足则丢弃消息仅保留温度 > 0 的数据,过滤心跳包
case_when多分支条件赋值,根据条件输出不同字段值code=1→status="ok", code=2→status="warn"
coalesce返回第一个非空字段值,可指定默认值字段可能缺失时提供兜底值

字段操作

算子作用典型场景完成
set新增字段或覆盖字段,值支持 CEL 表达式写入 tenant、计算 temp_f = temp * 1.8 + 32
delete删除一个或多个字段删除中间态字段、敏感字段、调试字段
rename字段重命名把异构设备字段名统一映射
keep_only只保留指定字段,其余全部删除脱敏后只保留必要字段再写存储
flatten嵌套 JSON 展平为顶层字段,支持自定义前缀{"a":{"b":1}}{"a.b":1}
nest将多个字段打包为嵌套结构lat / lon 合并为 location: {lat, lon}
merge合并两个 Map 对象,key 冲突时可配置覆盖策略用元数据扩充设备上报字段
expand把数组字段拆分为多条独立消息(1→N)payload.readings: [...] 每条拆成单独消息

类型转换与判断

算子作用典型场景完成
cast字段类型转换(to_int / to_float / to_string / to_bool字符串 "36.5" 转 float,"true" 转 bool
is_null / is_not_null判断字段是否为空filter 里的防御性校验
is_int / is_float / is_bool / is_str判断字段类型,返回 bool动态类型数据校验
is_array / is_map判断字段是否为数组或 Map,返回 bool上报格式校验

字符串

算子作用典型场景完成
trim / upper / lower去首尾空白、大小写转换设备 ID 标准化
split / concat / replace字符串分割、拼接、替换topic 路径拆分、字段值拼接
substr / strlen子串截取、字符串长度截断超长字段值
regex_match正则匹配,返回 boolfilter 里判断字段值格式
regex_extract正则提取捕获组,返回数组从日志字符串里提取 IP、时间戳
regex_replace正则替换脱敏手机号、清洗格式
template_render字符串模板渲染,支持 ${field} 语法拼接 "device-${device_id}-${region}"
url_encode / url_decodeURL 编解码处理含特殊字符的字段值

数学运算

算子作用典型场景完成
abs / ceil / floor / round绝对值、取整传感器值取整
sqrt / power / log / log10幂次与对数信号强度 dB 换算
clamp(x, min, max)区间截断,超出范围取边界值异常值截断防止下游越界
normalize(field, min, max)归一化到 [0,1],或自定义目标区间AI 训练数据预处理、异常分数标准化
unit_convert(field, from, to)工程单位换算,内置单位库(℃/℉、Pa/bar、mV/V 等)传感器单位标准化,无需外部转换代码
random()返回 [0,1) 随机数消息采样(只处理 10% 数据)

时间

算子作用典型场景完成
now_timestamp返回当前 Unix 时间戳(秒/毫秒)给消息注入处理时间
now_rfc3339返回当前时间的 RFC3339 字符串日志类消息的时间标注
unix_ts_to_rfc3339Unix 时间戳转 RFC3339 字符串设备时间戳格式化后写存储
rfc3339_to_unix_tsRFC3339 字符串转 Unix 时间戳统一时间字段为数值方便计算
format_date按格式字符串格式化日期时间输出 "2026-03-03 15:00:00" 格式
date_diff计算两个时间点的差值设备上报延迟、超时检测
timezone_convert时区转换全球设备统一转 UTC

JSON 操作

算子作用典型场景完成
json_decode / json_encodeJSON 字符串序列化/反序列化payload 是 JSON 字符串时先 decode
map_get / map_putMap 字段读写,支持嵌套路径安全读取可能不存在的嵌套字段
map_keys / map_values返回 Map 的所有 key 或 value 列表动态枚举字段
map_to_entriesMap 转为 [{key, value}] 数组把 map 转成可迭代结构
json_pathJSONPath 语法查询,如 $.store.book[*].author复杂嵌套 JSON 字段提取
jqjq 语法完整支持,复杂 JSON 变换一行 jq 替代多步字段操作

数组操作

算子作用典型场景完成
map_array对数组每个元素应用转换函数批量给数组里每条记录加字段
filter_array对数组按条件过滤元素过滤 readings 数组中的无效值
first / last / nth取数组首尾或第 N 个元素取最新一条读数
contains判断数组是否包含指定元素过滤设备类型在白名单内的消息
length返回数组长度判断批量上报条数

编解码

算子作用典型场景完成
base64_encode / base64_decodeBase64 编解码二进制内容转文本传输
bin2hexstr / hexstr2bin十六进制字符串与字节互转调试工业设备原始帧
gzip / gunzip压缩/解压压缩前写存储,或解压设备上报数据
csv_decode / csv_encodeCSV 字符串与结构化数据互转批量设备数据表格格式
bytes_decode(field, format)按格式解析原始字节流,支持 int16_be / float32_le 等工业协议格式Modbus/PLC 原始帧解析
protobuf_decode / protobuf_encodeProtobuf 编解码,需提供 schema工业设备 protobuf 上报
avro_decode / avro_encodeAvro 编解码Kafka 生态数据管道
msgpack_decode / msgpack_encodeMessagePack 编解码低功耗设备压缩上报

哈希与安全

算子作用典型场景完成
md5 / sha256 / sha512哈希计算内容摘要、去重标识
hmac_sha256HMAC-SHA256 签名消息完整性校验
aes_encrypt / aes_decryptAES 对称加密/解密字段级敏感数据加密
mask_field(field, strategy)字段脱敏(置空/置零/截断/哈希替换)手机号、坐标等隐私字段处理
uuid4生成 UUID v4给每条消息注入唯一 trace ID

边缘专属

算子作用典型场景完成
threshold_alert(field, min, max, label)字段值超阈值时自动注入告警标签温度超限自动打 alert=overheat
moving_avg(field, n)N 点移动平均(有限本地状态,重启丢失)边缘侧简单降噪
deduplicate(field, ttl_ms)按字段值去重,TTL 滑动窗口(有限本地状态)过滤设备重复上报

执行模型(基础版)

项目说明
规则表达规则以 JSON 数组表示
执行顺序数组有序,按顺序执行
数据流上一个算子的输出作为下一个算子的输入
异常处理任一环节失败按规则错误策略处理(如跳过或终止)

当前边界

基础版只聚焦“轻量、可预测、可验证”的链式处理:

能力项基础版状态
跨消息聚合不支持
窗口计算不支持
状态存储与回溯计算不支持

后续会在保持链路稳定的前提下,逐步扩展更多算子能力。