Skip to content

RobustMQ Rule Engine Design Thinking

Background and Problem

RobustMQ supports multiple protocols including MQTT and Kafka, and will integrate more protocols in the future. Each protocol has data entry and exit points, and these points often require data cleansing and transformation:

  • Raw byte streams reported by MQTT devices need to be parsed into structured data before routing
  • When Kafka messages arrive, they need field filtering and format conversion before being written to storage or forwarded to consumers
  • In Connector scenarios (similar to Kafka Connect), the data formats on the Source and Sink ends are usually inconsistent, requiring mapping and transformation in the middle

Existing solutions either require users to write their own code connecting to a processing framework (Flink, Kafka Streams), or rely on each broker's own rule engine — but these engines are protocol-bound: EMQX rules can only run in EMQX, Kafka Streams can only process Kafka messages.

RobustMQ's approach is: build a unified stateless rule engine at the kernel level that covers data processing needs across all protocol channels. The goal is not to build a general-purpose stream computing framework like Flink, but only to handle stateless data cleansing and transformation — which is sufficient for 80% of use cases.

Design Boundaries

Making explicit what we will and won't do:

Will do:

  • Stateless per-message processing: filtering, field transformation, type conversion, format conversion, routing
  • Coverage of all data ingress/egress points for MQTT, Kafka, and future protocols
  • Middle transformation layer for Connector scenarios (Source → Transform → Sink)
  • Lightweight, runnable on edge devices, single binary with no external dependencies

Will not do:

  • Stateful stream computing (window aggregation, Join, CEP) — that's Flink's domain
  • Cross-message correlation and accumulation — left to external processing frameworks

This boundary keeps the engine's complexity manageable while still covering the majority of real-world scenarios.

Overall Architecture

The rule engine is structured in three layers:

  MQTT Channel    Kafka Channel    Future Protocols
       │                │                 │
       └────────────────┴─────────────────┘

              ┌─────────▼──────────┐
              │   Rule Engine (mount point)  │  ← rules can be mounted on every protocol channel
              └─────────┬──────────┘

       ┌────────────────┼─────────────────┐
       ▼                ▼                 ▼
  MQTT Sink       Kafka Sink        Connector Sink
┌─────────────────────────────────────┐
│  AI Layer (natural language → rule generation)  │
│  MCP Server / Chat / Dashboard      │
├─────────────────────────────────────┤
│  Rule Schema Layer                  │
│  Operator set + composition + JSON description  │
├─────────────────────────────────────┤
│  Execution Engine Layer (Runtime)   │
│  Rust implementation + CEL expressions + function registry  │
└─────────────────────────────────────┘

Execution Engine Layer: Pure computation, stateless. Each incoming message passes through a set of operators in sequence and outputs a transformed message or is discarded. No dependency on external services; can be compiled to a single binary.

Rule Schema Layer: Defines what the system can do, and is the input protocol for the execution engine. A rule is an ordered list of operators, where each operator has a well-defined type, parameters, and semantics. Expressed in JSON, configurable through the Dashboard or delivered via API.

AI Layer: Users describe requirements in natural language, and AI translates them into rule JSON. After user confirmation, the rule is deployed for execution. AI is not involved in execution — it only handles generation.

Execution Engine Layer (Runtime)

Core Model

The execution engine's core model is minimal: a message comes in, passes through an ordered set of operators, and outputs one (or zero) messages.

Message In


┌──────────┐
│ Op 1     │  filter: payload.temp > 0
├──────────┤
│ Op 2     │  set: payload.temp_f = payload.temp * 1.8 + 32
├──────────┤
│ Op 3     │  delete: payload.raw_bytes
├──────────┤
│ Op 4     │  route: topic = "processed/${device_id}"
└──────────┘


Message Out (or Drop)

If any filter operator returns false, the message is immediately discarded and subsequent operators are not executed. This model is purely functional, with no side effects; each message is processed completely independently.

Expression Engine: CEL's Role

CEL is not the entirety of the Runtime — it is responsible for only one thing: dynamic expression evaluation. Specifically, two operators: the filter condition of filter, and the field calculation expression of set.

# filter's expr field
payload.temp > 0 && payload.device_id != ""

# set's expr field (computing a new field value)
payload.temp * 1.8 + 32
string(payload.ts) + "_" + payload.device_id

Operators like rename, delete, keep_only, bytes_decode, unit_convert, and csv_decode have nothing to do with CEL — they are all implemented as fixed logic directly in Rust.

The rationale for choosing CEL (Common Expression Language): Google open source, Rust implementation as cel-rust, widely adopted by Kubernetes, Envoy, and Firebase; type-safe, sandbox-isolated (cannot execute arbitrary code), and AI generates CEL syntax with high quality (abundant training data).

Message Model

The engine uses a unified structured message object internally, independent of specific protocol formats:

Message {
    topic:    String,          // message routing key
    payload:  Map<String, Any>, // structured content, already decoded
    headers:  Map<String, String>, // protocol headers
    metadata: Map<String, Any>, // source protocol, timestamp, and other metadata
}

Each protocol layer is responsible for converting the raw format to this structure before messages enter the rule engine. The rule engine has no awareness of protocol details. This is also the foundation for multiple protocols sharing the same set of operators.

Operator Implementation Categories

The main body of work in the Runtime is implementing 50+ operators, but not every one needs to be written from scratch. By implementation method, there are three categories:

Use existing crates directly, almost no code needed

OperatorDependency Crate
JSON field access, multi-level path extractionserde_json (.pointer("/a/b/c"))
JSONPath queriesjsonpath-rust or serde_json_path
jq syntaxjaq (pure Rust implementation)
Regex matching/extractionregex
Base64base64
Gzip/compressionflate2
Protobufprost
Avroapache-avro
CSVcsv
MessagePackrmp-serde
Time handlingchrono
Hash/HMACsha2 + hmac

Thin wrapper, write a small amount of glue code

  • String operations (trim/split/replace/upper/lower) → Rust standard library natively supports these; wrap them into the operator interface
  • Type conversion (to_int/to_float) → standard library parse() with error handling
  • Math functions → standard library f64 methods

Need to implement logic from scratch

  • unit_convert — maintain an engineering unit mapping table and conversion formulas
  • bytes_decode — parse raw byte streams in formats like int16_be / float32_le
  • normalize — a single formula but needs custom wrapping
  • threshold_alert — logic for injecting labels when thresholds are exceeded
  • template_render${field} syntax substitution
  • moving_avg — sliding average with simple state (edge case)
  • Framework layer: Rule matching, operator chain scheduling, on_error branch handling

The operators that truly need logic written from scratch are not many — mainly the edge-specific ones and the framework layer itself.

Performance Considerations

  • Rules are compiled on first load (CEL expressions pre-compiled, functions registered); at runtime only already-compiled instructions are executed, with no parsing overhead
  • The operator chain is a Vec<Box<dyn Operator>>, executed sequentially, with no overhead beyond dynamic dispatch
  • Target: single-core processing capacity should not fall below the throughput ceiling of the protocol layer itself; the rule engine should not become a bottleneck

Rule Schema Layer

Rule Structure

A complete Rule structure consists of four parts: match conditions, N operators, output target, and failure handling strategy.

json
{
  "id": "rule-001",
  "name": "Temperature Unit Conversion",
  "match": {
    "topics": ["devices/+/telemetry"],
    "protocols": ["mqtt"]
  },
  "ops": [
    { "type": "filter",          "expr": "payload.temp != null" },
    { "type": "set",             "field": "payload.temp_f", "expr": "payload.temp * 1.8 + 32" },
    { "type": "unit_convert",    "field": "payload.pressure", "from": "Pa", "to": "bar" },
    { "type": "delete",          "fields": ["payload.raw"] },
    { "type": "threshold_alert", "field": "payload.temp_f", "max": 176, "label": "overheat" }
  ],
  "sink": {
    "type": "topic",
    "target": "processed/${metadata.device_id}"
  },
  "on_error": {
    "strategy": "deadletter",
    "topic": "$rule/rule-001/deadletter"
  }
}

ops is an ordered operator chain with no limit on the number of operators; the output of one operator becomes the input of the next. If any filter returns false, the entire chain terminates and the message is discarded — this does not trigger on_error (a filter failure is normal filtering behavior, not an error).

sink output targets support multiple types:

topic     → publish to another Topic (internal forwarding)
drop      → silently discard (filtering only, no output needed)
webhook   → send to an external HTTP endpoint
storage   → write to persistent storage (future extension)

on_error failure handling handles cases where operator execution fails (field not found, type conversion failure, byte format error, etc.):

drop        → silently discard, no output (default)
passthrough → skip the failed operator and continue processing downstream
deadletter  → send to a dedicated dead letter topic, preserving context for post-mortem analysis and redelivery
abort       → terminate and log the error

In production environments, deadletter is recommended: failed messages are not lost, ops teams can review the cause, and after fixing the rule they can be redelivered.

Operator Type System

Each operator has a precise type definition with clearly typed parameters. This is the prerequisite for AI being able to reliably generate rules — vague operator definitions lead to uncertain semantics in AI-generated rules.

Operator
  ├── Filter    { expr: CelExpr }
  ├── Set       { field: FieldPath, expr: CelExpr }
  ├── Delete    { fields: Vec<FieldPath> }
  ├── Rename    { from: FieldPath, to: FieldPath }
  ├── KeepOnly  { fields: Vec<FieldPath> }
  ├── Flatten   { prefix: Option<String> }
  ├── Decode    { field: FieldPath, codec: Codec }   // json/protobuf/avro/csv/bytes
  ├── Encode    { field: FieldPath, codec: Codec }
  ├── Convert   { field: FieldPath, from: Unit, to: Unit }
  ├── Normalize { field: FieldPath, min: f64, max: f64 }
  ├── Alert     { field: FieldPath, min: Option<f64>, max: Option<f64>, label: String }
  └── Route     { target: Template }

Execution Model for Multiple Rules

Multiple Rules execute independently and in parallel — they are not chained together. The same message can match multiple Rules simultaneously; each Rule independently receives a copy of the original message, executes independently, outputs independently, and they do not affect each other.

Original Message ──┬──▶ Rule-001 operator chain ──▶ output to processed/...

                   └──▶ Rule-002 operator chain ──▶ output to archive/...

Rule-001's output does not flow into Rule-002. If multi-step processing is needed, the approach is to chain multiple operators within a single Rule rather than connecting multiple Rules end-to-end.

If you truly need "Rule A's output as Rule B's input," you can have Rule A output to an intermediate Topic and Rule B match that Topic — simulating chained processing through Topic routing. But most scenarios are handled well enough with a multi-operator chain within a single Rule.

Schema Version Management

Rule schemas require version control. When the engine is upgraded with new operators, old rules can still execute correctly; when operators are deprecated, there are clear migration paths. Schema versions are decoupled from engine versions, so rule files can migrate between different engine versions.

Rule Storage and Distribution

Rules are stored through the Meta Service, reusing the existing cluster mechanism. When rules change, the Meta Service notifies all Broker nodes to hot-reload them — no restart required, and in-flight message processing is unaffected.


AI Layer (Natural Language → Rule Generation)

The Core Problem

The AI Layer needs to solve this problem: reliably translating users' natural language descriptions into rule JSON conforming to the Schema.

"Reliably" is the key word. AI-generated rules must not be ambiguous, must not generate operators the engine doesn't recognize, and must not produce rules with incorrect parameter types.

Implementation Approach

Prompt Engineering + Schema Constraints

The complete operator Schema (each operator's type, parameters, and semantic description) is injected into the Prompt, so AI can only use already-defined operators when generating rules, with parameter types constrained by the Schema. After generation, a Schema validation step runs before deployment — invalid rules do not enter the engine.

The benefit of this approach: AI hallucination problems are caught by Schema validation. Even if an incorrect operator name or parameter type is generated, validation will intercept it and return an error, preventing runtime exceptions.

Context Augmentation

When users describe requirements, they often don't specify complete field paths — they just say "the temperature field." The AI layer needs to infer actual field names by combining the current Topic's sample messages (users can upload an example message), generating precise payload.temperature rather than the vague temperature.

Confirmation Dialog

After AI generates a rule, it displays a human-readable description (not JSON) for user confirmation:

The following rule will be created:
  1. Filter: discard messages where the temp field is null
  2. Convert: convert temp (Celsius) to Fahrenheit, write to temp_f field
  3. Alert: mark alert=overheat in messages where temp_f exceeds 176°F
  4. Route: output to processed/${device_id}

Confirm activation? [Yes / Modify]

After user confirmation, the rule JSON is written to the Meta Service in the background. Users never need to see the JSON.

MCP Server Interface

Core tools exposed by the MCP Server:

ToolDescription
create_ruleCreate a rule from a natural language description
list_rulesQuery the list of currently active rules
update_ruleModify a rule (supports natural language description of changes)
delete_ruleDelete a rule
test_ruleTest a rule with a sample message and return the output
explain_ruleTranslate rule JSON into a human-readable description

test_rule is the most valuable tool among them: before a rule takes effect, run it against a real message to see if the output matches expectations, reducing the risk of misconfigured rules making it to production.

Stateless Operator Set

Below is the complete set of operators planned for implementation. All are stateless; each message is processed independently.

CategoryOperatorDescription
Filtering & ConditionsfilterConditional filtering, CEL expression, supports AND/OR/NOT/comparison/IN/BETWEEN
coalesceReturn the first non-null value
case_whenConditional branching, output different field values based on conditions
Field OperationsselectSelect specified fields
renameRename a field
setAdd or overwrite a field; value supports CEL expressions
deleteDelete a field
keep_onlyKeep only specified fields, delete all others
flattenFlatten nested JSON into top-level fields
nestPack multiple fields into a nested structure
mergeMerge two Map objects
Type Conversionto_int / to_float / to_string / to_boolBasic type conversion
float(x, decimals)Round a float to specified decimal places
Stringtrim / upper / lowerStrip whitespace, case conversion
split / concat / replaceSplit, concatenate, replace
regex_match / regex_extractRegex matching and capture group extraction
starts_with / ends_with / containsContainment checks
url_encode / url_decodeURL encoding/decoding
template_renderString template rendering, supports ${field} syntax
JSONjson_decode / json_encodeJSON serialization/deserialization
map_get / map_put / map_keys / map_valuesMap field read/write
json_pathJSONPath queries, $.store.book[*].author
jqFull jq syntax support for complex JSON transformations
Arraysmap_array / filter_arrayApply a function or filter to each array element
flatten_array / sort_arrayFlatten nested arrays, sort
first / last / nthGet first, last, or Nth element
zipMerge two arrays into an array of objects
Mathabs / ceil / floor / round / sqrtBasic math operations
log / log10 / exp / powerExponential and logarithmic functions
clamp(x, min, max)Interval clamping; out-of-range values take the boundary
normalize(x, min, max)Normalize to [0, 1], useful for AI training data preprocessing ⭐
unit_convert(x, from, to)Engineering unit conversion with built-in unit library (°C/°F, Pa/bar, mV/V, etc.) ⭐
Timenow_timestamp / now_rfc3339Current timestamp / RFC3339 string
unix_ts_to_rfc3339 / rfc3339_to_unix_tsConversion between timestamps and RFC3339
format_dateFormat a date with a format string
date_diffCalculate the difference between two timestamps
timezone_convertTimezone conversion
Encoding/Decodingbase64_encode / base64_decodeBase64
bin2hexstr / hexstr2binHex encoding/decoding
gzip / gunzipCompress/decompress
protobuf_decode / protobuf_encodeProtobuf, requires schema
avro_decode / avro_encodeAvro
msgpack_decode / msgpack_encodeMessagePack
csv_decode / csv_encodeConversion between CSV strings and structured data ⭐
bytes_decode(payload, format)Parse raw byte streams by format; supports industrial data formats like int16_be, float32_le
Hash/Securitymd5 / sha256 / sha512 / hmac_sha256Hash and HMAC
aes_encrypt / aes_decryptSymmetric encryption
uuid4Generate a UUID
Edge-Specificthreshold_alert(x, min, max)Automatically inject alert labels into messages when thresholds are exceeded ⭐
moving_avg(field, n)N-point moving average, for simple edge-side noise reduction ⭐
deduplicate(field, ttl_ms)Deduplication by field value, based on a local TTL sliding window ⭐

Operators marked with ⭐ are differentiated operators not found in EMQX's rule engine, targeting edge computing and AI scenarios.

User Access Methods

Core design principle: users should need to manually configure operators as little as possible.

The primary method of generating rules is natural language; manual configuration is only a fallback option.

1. Natural Language Generation (Primary Path)

Whether in the Dashboard or an external AI tool, users describe their needs in natural language. The system generates a rule, shows it to the user for confirmation, and it takes effect immediately after confirmation. The Dashboard has a built-in chat dialog — no need to jump to an external tool.

User input: convert payload.temp from Celsius to Fahrenheit; add alert=true if above 80 degrees

AI generates rule JSON (displayed to user for preview)

User confirms → rule is written and takes effect

2. MCP Server (Direct AI Tool Connection)

RobustMQ exposes the rule engine management interface as an MCP Server, allowing any AI tool that supports MCP (Claude, Cursor, various AI Agents) to operate it directly. Users describe their requirements in their familiar AI tool, and rules are written to RobustMQ via MCP — no need to open the Dashboard.

This is the developer-facing scenario: configuring data processing rules directly in the IDE during development, within the same workflow as writing code.

3. Direct API Submission (Fallback)

Submit rule JSON directly via HTTP API — suitable for scenarios requiring version management or CI/CD automation. Manually writing rule JSON is the last resort fallback; normal usage should not require this step.


All three methods run on the same underlying operator engine. The more scenarios that natural language can cover, the fewer situations where users need to manually deal with operator details — and that's also why the operator set needs precise semantics and comprehensive coverage: it is the foundation for AI-generated rules, not a user-facing interface for direct manipulation.

Comparison with Existing Solutions

EMQX Rule EngineKafka StreamsFlinkRobustMQ Rule Engine
DeploymentBundled with BrokerIndependent JVM serviceIndependent clusterBuilt into kernel, universal for edge/cloud
ExpressivenessSQL dialectJava DSLJava/SQLOperator set + CEL
Industrial data supportWeakNoneNoneBuilt-in bytes_decode, unit_convert
AI-friendlyWeakWeakWeakMCP Server + standardized operators
Edge deploymentNot supportedNot supportedNot supportedSingle binary, no external dependencies
Best suited forSimple MQTT filteringKafka stream computingComplex stateful computationIoT data cleansing, edge preprocessing

RobustMQ's rule engine is not positioned to replace Flink, but to cover scenarios Flink can't reach: lightweight data cleansing on edge devices, preprocessing IoT data before it enters the Broker, format mapping in Connector middleware, and simple transformation needs where deploying an external framework is undesirable.

In other words: users can solve 80% of their data cleansing needs within RobustMQ itself, without needing to deploy a separate Flink cluster for this purpose.

Worth noting in particular: EMQX's Flow Designer supports at most one Function node + one Filter node per rule; multi-step processing either requires writing complex SQL or chaining multiple rules. RobustMQ's operator chain supports any N operators in sequence — more expressive and more intuitive to configure.


Summary

What We're Building

Build a unified stateless rule engine at the RobustMQ kernel level, covering data processing needs across all MQTT, Kafka, and future protocol channels. The goal is to enable users to complete 80% of data cleansing and transformation work without introducing any external framework.

What It Should Look Like

Users don't need to understand rule engine syntax or operator details. Whether in the Dashboard or an external AI tool, they describe their requirements in natural language, the system generates a rule, and it takes effect immediately after confirmation:

"Convert temperature from Celsius to Fahrenheit, add an alert label if above 80°C, send to alert/${device_id}"
    ↓ AI generates rule
    ↓ User confirms
    ↓ Rule takes effect, every message automatically processed

The rule engine exposes standard interfaces through an MCP Server, allowing AI tools (Claude, Cursor, various Agents) to operate it directly — configuring rules and writing code all within the same workflow.

Technical Implementation

Three-layer architecture:

  • Execution Engine (Runtime): Rust implementation, purely stateless; each message is processed independently. Operators execute in sequence, with the output of each operator as the input to the next. CEL (cel-rust) handles dynamic expression evaluation in filter and set; other operators are implemented through a built-in function registry.
  • Rule Schema: A Rule = match (match conditions) + N operators + sink (output target) + on_error (failure strategy). JSON description, compiled at startup, hot-reloaded without restart. Multiple Rules execute independently in parallel without affecting each other.
  • AI Layer: Prompt + Schema constraints ensure generation quality; Schema validation provides a backstop for AI hallucinations. The MCP Server exposes tools like create_rule, test_rule, and explain_rule. test_rule is the most critical — verify with a real message before activating.

Operator Set:

  • Most operators leverage existing Rust crates (serde_json, regex, prost, csv, flate2, chrono, etc.) — low implementation cost
  • Core operators needing custom implementation: unit_convert (engineering unit conversion), bytes_decode (industrial byte parsing), normalize (data normalization), threshold_alert (edge alert labeling)
  • Framework layer: Rule matching, operator chain scheduling, on_error branch handling

Key Differences from EMQX:

DimensionEMQX FlowRobustMQ
Processing steps per ruleAt most 1 Function + 1 FilterN operators in any combination
Protocol bindingMQTT onlyMQTT + Kafka + future protocols
Industrial data supportNonebytes_decode / unit_convert
AI integrationNoneNatural language generation + MCP Server
Edge deploymentNot supportedSingle binary, no external dependencies

The rule engine is not an ancillary feature of RobustMQ — it is the key step that transforms it from a "message queue" into a "data processing platform."

Project URL: https://github.com/robustmq/robustmq

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀