RobustMQ Rule Engine Design Thinking
Background and Problem
RobustMQ supports multiple protocols including MQTT and Kafka, and will integrate more protocols in the future. Each protocol has data entry and exit points, and these points often require data cleansing and transformation:
- Raw byte streams reported by MQTT devices need to be parsed into structured data before routing
- When Kafka messages arrive, they need field filtering and format conversion before being written to storage or forwarded to consumers
- In Connector scenarios (similar to Kafka Connect), the data formats on the Source and Sink ends are usually inconsistent, requiring mapping and transformation in the middle
Existing solutions either require users to write their own code connecting to a processing framework (Flink, Kafka Streams), or rely on each broker's own rule engine — but these engines are protocol-bound: EMQX rules can only run in EMQX, Kafka Streams can only process Kafka messages.
RobustMQ's approach is: build a unified stateless rule engine at the kernel level that covers data processing needs across all protocol channels. The goal is not to build a general-purpose stream computing framework like Flink, but only to handle stateless data cleansing and transformation — which is sufficient for 80% of use cases.
Design Boundaries
Making explicit what we will and won't do:
Will do:
- Stateless per-message processing: filtering, field transformation, type conversion, format conversion, routing
- Coverage of all data ingress/egress points for MQTT, Kafka, and future protocols
- Middle transformation layer for Connector scenarios (Source → Transform → Sink)
- Lightweight, runnable on edge devices, single binary with no external dependencies
Will not do:
- Stateful stream computing (window aggregation, Join, CEP) — that's Flink's domain
- Cross-message correlation and accumulation — left to external processing frameworks
This boundary keeps the engine's complexity manageable while still covering the majority of real-world scenarios.
Overall Architecture
The rule engine is structured in three layers:
MQTT Channel Kafka Channel Future Protocols
│ │ │
└────────────────┴─────────────────┘
│
┌─────────▼──────────┐
│ Rule Engine (mount point) │ ← rules can be mounted on every protocol channel
└─────────┬──────────┘
│
┌────────────────┼─────────────────┐
▼ ▼ ▼
MQTT Sink Kafka Sink Connector Sink┌─────────────────────────────────────┐
│ AI Layer (natural language → rule generation) │
│ MCP Server / Chat / Dashboard │
├─────────────────────────────────────┤
│ Rule Schema Layer │
│ Operator set + composition + JSON description │
├─────────────────────────────────────┤
│ Execution Engine Layer (Runtime) │
│ Rust implementation + CEL expressions + function registry │
└─────────────────────────────────────┘Execution Engine Layer: Pure computation, stateless. Each incoming message passes through a set of operators in sequence and outputs a transformed message or is discarded. No dependency on external services; can be compiled to a single binary.
Rule Schema Layer: Defines what the system can do, and is the input protocol for the execution engine. A rule is an ordered list of operators, where each operator has a well-defined type, parameters, and semantics. Expressed in JSON, configurable through the Dashboard or delivered via API.
AI Layer: Users describe requirements in natural language, and AI translates them into rule JSON. After user confirmation, the rule is deployed for execution. AI is not involved in execution — it only handles generation.
Execution Engine Layer (Runtime)
Core Model
The execution engine's core model is minimal: a message comes in, passes through an ordered set of operators, and outputs one (or zero) messages.
Message In
│
▼
┌──────────┐
│ Op 1 │ filter: payload.temp > 0
├──────────┤
│ Op 2 │ set: payload.temp_f = payload.temp * 1.8 + 32
├──────────┤
│ Op 3 │ delete: payload.raw_bytes
├──────────┤
│ Op 4 │ route: topic = "processed/${device_id}"
└──────────┘
│
▼
Message Out (or Drop)If any filter operator returns false, the message is immediately discarded and subsequent operators are not executed. This model is purely functional, with no side effects; each message is processed completely independently.
Expression Engine: CEL's Role
CEL is not the entirety of the Runtime — it is responsible for only one thing: dynamic expression evaluation. Specifically, two operators: the filter condition of filter, and the field calculation expression of set.
# filter's expr field
payload.temp > 0 && payload.device_id != ""
# set's expr field (computing a new field value)
payload.temp * 1.8 + 32
string(payload.ts) + "_" + payload.device_idOperators like rename, delete, keep_only, bytes_decode, unit_convert, and csv_decode have nothing to do with CEL — they are all implemented as fixed logic directly in Rust.
The rationale for choosing CEL (Common Expression Language): Google open source, Rust implementation as cel-rust, widely adopted by Kubernetes, Envoy, and Firebase; type-safe, sandbox-isolated (cannot execute arbitrary code), and AI generates CEL syntax with high quality (abundant training data).
Message Model
The engine uses a unified structured message object internally, independent of specific protocol formats:
Message {
topic: String, // message routing key
payload: Map<String, Any>, // structured content, already decoded
headers: Map<String, String>, // protocol headers
metadata: Map<String, Any>, // source protocol, timestamp, and other metadata
}Each protocol layer is responsible for converting the raw format to this structure before messages enter the rule engine. The rule engine has no awareness of protocol details. This is also the foundation for multiple protocols sharing the same set of operators.
Operator Implementation Categories
The main body of work in the Runtime is implementing 50+ operators, but not every one needs to be written from scratch. By implementation method, there are three categories:
Use existing crates directly, almost no code needed
| Operator | Dependency Crate |
|---|---|
| JSON field access, multi-level path extraction | serde_json (.pointer("/a/b/c")) |
| JSONPath queries | jsonpath-rust or serde_json_path |
| jq syntax | jaq (pure Rust implementation) |
| Regex matching/extraction | regex |
| Base64 | base64 |
| Gzip/compression | flate2 |
| Protobuf | prost |
| Avro | apache-avro |
| CSV | csv |
| MessagePack | rmp-serde |
| Time handling | chrono |
| Hash/HMAC | sha2 + hmac |
Thin wrapper, write a small amount of glue code
- String operations (trim/split/replace/upper/lower) → Rust standard library natively supports these; wrap them into the operator interface
- Type conversion (to_int/to_float) → standard library
parse()with error handling - Math functions → standard library
f64methods
Need to implement logic from scratch
unit_convert— maintain an engineering unit mapping table and conversion formulasbytes_decode— parse raw byte streams in formats like int16_be / float32_lenormalize— a single formula but needs custom wrappingthreshold_alert— logic for injecting labels when thresholds are exceededtemplate_render—${field}syntax substitutionmoving_avg— sliding average with simple state (edge case)- Framework layer: Rule matching, operator chain scheduling, on_error branch handling
The operators that truly need logic written from scratch are not many — mainly the edge-specific ones and the framework layer itself.
Performance Considerations
- Rules are compiled on first load (CEL expressions pre-compiled, functions registered); at runtime only already-compiled instructions are executed, with no parsing overhead
- The operator chain is a
Vec<Box<dyn Operator>>, executed sequentially, with no overhead beyond dynamic dispatch - Target: single-core processing capacity should not fall below the throughput ceiling of the protocol layer itself; the rule engine should not become a bottleneck
Rule Schema Layer
Rule Structure
A complete Rule structure consists of four parts: match conditions, N operators, output target, and failure handling strategy.
{
"id": "rule-001",
"name": "Temperature Unit Conversion",
"match": {
"topics": ["devices/+/telemetry"],
"protocols": ["mqtt"]
},
"ops": [
{ "type": "filter", "expr": "payload.temp != null" },
{ "type": "set", "field": "payload.temp_f", "expr": "payload.temp * 1.8 + 32" },
{ "type": "unit_convert", "field": "payload.pressure", "from": "Pa", "to": "bar" },
{ "type": "delete", "fields": ["payload.raw"] },
{ "type": "threshold_alert", "field": "payload.temp_f", "max": 176, "label": "overheat" }
],
"sink": {
"type": "topic",
"target": "processed/${metadata.device_id}"
},
"on_error": {
"strategy": "deadletter",
"topic": "$rule/rule-001/deadletter"
}
}ops is an ordered operator chain with no limit on the number of operators; the output of one operator becomes the input of the next. If any filter returns false, the entire chain terminates and the message is discarded — this does not trigger on_error (a filter failure is normal filtering behavior, not an error).
sink output targets support multiple types:
topic → publish to another Topic (internal forwarding)
drop → silently discard (filtering only, no output needed)
webhook → send to an external HTTP endpoint
storage → write to persistent storage (future extension)on_error failure handling handles cases where operator execution fails (field not found, type conversion failure, byte format error, etc.):
drop → silently discard, no output (default)
passthrough → skip the failed operator and continue processing downstream
deadletter → send to a dedicated dead letter topic, preserving context for post-mortem analysis and redelivery
abort → terminate and log the errorIn production environments, deadletter is recommended: failed messages are not lost, ops teams can review the cause, and after fixing the rule they can be redelivered.
Operator Type System
Each operator has a precise type definition with clearly typed parameters. This is the prerequisite for AI being able to reliably generate rules — vague operator definitions lead to uncertain semantics in AI-generated rules.
Operator
├── Filter { expr: CelExpr }
├── Set { field: FieldPath, expr: CelExpr }
├── Delete { fields: Vec<FieldPath> }
├── Rename { from: FieldPath, to: FieldPath }
├── KeepOnly { fields: Vec<FieldPath> }
├── Flatten { prefix: Option<String> }
├── Decode { field: FieldPath, codec: Codec } // json/protobuf/avro/csv/bytes
├── Encode { field: FieldPath, codec: Codec }
├── Convert { field: FieldPath, from: Unit, to: Unit }
├── Normalize { field: FieldPath, min: f64, max: f64 }
├── Alert { field: FieldPath, min: Option<f64>, max: Option<f64>, label: String }
└── Route { target: Template }Execution Model for Multiple Rules
Multiple Rules execute independently and in parallel — they are not chained together. The same message can match multiple Rules simultaneously; each Rule independently receives a copy of the original message, executes independently, outputs independently, and they do not affect each other.
Original Message ──┬──▶ Rule-001 operator chain ──▶ output to processed/...
│
└──▶ Rule-002 operator chain ──▶ output to archive/...Rule-001's output does not flow into Rule-002. If multi-step processing is needed, the approach is to chain multiple operators within a single Rule rather than connecting multiple Rules end-to-end.
If you truly need "Rule A's output as Rule B's input," you can have Rule A output to an intermediate Topic and Rule B match that Topic — simulating chained processing through Topic routing. But most scenarios are handled well enough with a multi-operator chain within a single Rule.
Schema Version Management
Rule schemas require version control. When the engine is upgraded with new operators, old rules can still execute correctly; when operators are deprecated, there are clear migration paths. Schema versions are decoupled from engine versions, so rule files can migrate between different engine versions.
Rule Storage and Distribution
Rules are stored through the Meta Service, reusing the existing cluster mechanism. When rules change, the Meta Service notifies all Broker nodes to hot-reload them — no restart required, and in-flight message processing is unaffected.
AI Layer (Natural Language → Rule Generation)
The Core Problem
The AI Layer needs to solve this problem: reliably translating users' natural language descriptions into rule JSON conforming to the Schema.
"Reliably" is the key word. AI-generated rules must not be ambiguous, must not generate operators the engine doesn't recognize, and must not produce rules with incorrect parameter types.
Implementation Approach
Prompt Engineering + Schema Constraints
The complete operator Schema (each operator's type, parameters, and semantic description) is injected into the Prompt, so AI can only use already-defined operators when generating rules, with parameter types constrained by the Schema. After generation, a Schema validation step runs before deployment — invalid rules do not enter the engine.
The benefit of this approach: AI hallucination problems are caught by Schema validation. Even if an incorrect operator name or parameter type is generated, validation will intercept it and return an error, preventing runtime exceptions.
Context Augmentation
When users describe requirements, they often don't specify complete field paths — they just say "the temperature field." The AI layer needs to infer actual field names by combining the current Topic's sample messages (users can upload an example message), generating precise payload.temperature rather than the vague temperature.
Confirmation Dialog
After AI generates a rule, it displays a human-readable description (not JSON) for user confirmation:
The following rule will be created:
1. Filter: discard messages where the temp field is null
2. Convert: convert temp (Celsius) to Fahrenheit, write to temp_f field
3. Alert: mark alert=overheat in messages where temp_f exceeds 176°F
4. Route: output to processed/${device_id}
Confirm activation? [Yes / Modify]After user confirmation, the rule JSON is written to the Meta Service in the background. Users never need to see the JSON.
MCP Server Interface
Core tools exposed by the MCP Server:
| Tool | Description |
|---|---|
create_rule | Create a rule from a natural language description |
list_rules | Query the list of currently active rules |
update_rule | Modify a rule (supports natural language description of changes) |
delete_rule | Delete a rule |
test_rule | Test a rule with a sample message and return the output |
explain_rule | Translate rule JSON into a human-readable description |
test_rule is the most valuable tool among them: before a rule takes effect, run it against a real message to see if the output matches expectations, reducing the risk of misconfigured rules making it to production.
Stateless Operator Set
Below is the complete set of operators planned for implementation. All are stateless; each message is processed independently.
| Category | Operator | Description |
|---|---|---|
| Filtering & Conditions | filter | Conditional filtering, CEL expression, supports AND/OR/NOT/comparison/IN/BETWEEN |
coalesce | Return the first non-null value | |
case_when | Conditional branching, output different field values based on conditions | |
| Field Operations | select | Select specified fields |
rename | Rename a field | |
set | Add or overwrite a field; value supports CEL expressions | |
delete | Delete a field | |
keep_only | Keep only specified fields, delete all others | |
flatten | Flatten nested JSON into top-level fields | |
nest | Pack multiple fields into a nested structure | |
merge | Merge two Map objects | |
| Type Conversion | to_int / to_float / to_string / to_bool | Basic type conversion |
float(x, decimals) | Round a float to specified decimal places | |
| String | trim / upper / lower | Strip whitespace, case conversion |
split / concat / replace | Split, concatenate, replace | |
regex_match / regex_extract | Regex matching and capture group extraction | |
starts_with / ends_with / contains | Containment checks | |
url_encode / url_decode | URL encoding/decoding | |
template_render | String template rendering, supports ${field} syntax | |
| JSON | json_decode / json_encode | JSON serialization/deserialization |
map_get / map_put / map_keys / map_values | Map field read/write | |
json_path | JSONPath queries, $.store.book[*].author | |
jq | Full jq syntax support for complex JSON transformations | |
| Arrays | map_array / filter_array | Apply a function or filter to each array element |
flatten_array / sort_array | Flatten nested arrays, sort | |
first / last / nth | Get first, last, or Nth element | |
zip | Merge two arrays into an array of objects | |
| Math | abs / ceil / floor / round / sqrt | Basic math operations |
log / log10 / exp / power | Exponential and logarithmic functions | |
clamp(x, min, max) | Interval clamping; out-of-range values take the boundary | |
normalize(x, min, max) | Normalize to [0, 1], useful for AI training data preprocessing ⭐ | |
unit_convert(x, from, to) | Engineering unit conversion with built-in unit library (°C/°F, Pa/bar, mV/V, etc.) ⭐ | |
| Time | now_timestamp / now_rfc3339 | Current timestamp / RFC3339 string |
unix_ts_to_rfc3339 / rfc3339_to_unix_ts | Conversion between timestamps and RFC3339 | |
format_date | Format a date with a format string | |
date_diff | Calculate the difference between two timestamps | |
timezone_convert | Timezone conversion | |
| Encoding/Decoding | base64_encode / base64_decode | Base64 |
bin2hexstr / hexstr2bin | Hex encoding/decoding | |
gzip / gunzip | Compress/decompress | |
protobuf_decode / protobuf_encode | Protobuf, requires schema | |
avro_decode / avro_encode | Avro | |
msgpack_decode / msgpack_encode | MessagePack | |
csv_decode / csv_encode | Conversion between CSV strings and structured data ⭐ | |
bytes_decode(payload, format) | Parse raw byte streams by format; supports industrial data formats like int16_be, float32_le ⭐ | |
| Hash/Security | md5 / sha256 / sha512 / hmac_sha256 | Hash and HMAC |
aes_encrypt / aes_decrypt | Symmetric encryption | |
uuid4 | Generate a UUID | |
| Edge-Specific | threshold_alert(x, min, max) | Automatically inject alert labels into messages when thresholds are exceeded ⭐ |
moving_avg(field, n) | N-point moving average, for simple edge-side noise reduction ⭐ | |
deduplicate(field, ttl_ms) | Deduplication by field value, based on a local TTL sliding window ⭐ |
Operators marked with ⭐ are differentiated operators not found in EMQX's rule engine, targeting edge computing and AI scenarios.
User Access Methods
Core design principle: users should need to manually configure operators as little as possible.
The primary method of generating rules is natural language; manual configuration is only a fallback option.
1. Natural Language Generation (Primary Path)
Whether in the Dashboard or an external AI tool, users describe their needs in natural language. The system generates a rule, shows it to the user for confirmation, and it takes effect immediately after confirmation. The Dashboard has a built-in chat dialog — no need to jump to an external tool.
User input: convert payload.temp from Celsius to Fahrenheit; add alert=true if above 80 degrees
↓
AI generates rule JSON (displayed to user for preview)
↓
User confirms → rule is written and takes effect2. MCP Server (Direct AI Tool Connection)
RobustMQ exposes the rule engine management interface as an MCP Server, allowing any AI tool that supports MCP (Claude, Cursor, various AI Agents) to operate it directly. Users describe their requirements in their familiar AI tool, and rules are written to RobustMQ via MCP — no need to open the Dashboard.
This is the developer-facing scenario: configuring data processing rules directly in the IDE during development, within the same workflow as writing code.
3. Direct API Submission (Fallback)
Submit rule JSON directly via HTTP API — suitable for scenarios requiring version management or CI/CD automation. Manually writing rule JSON is the last resort fallback; normal usage should not require this step.
All three methods run on the same underlying operator engine. The more scenarios that natural language can cover, the fewer situations where users need to manually deal with operator details — and that's also why the operator set needs precise semantics and comprehensive coverage: it is the foundation for AI-generated rules, not a user-facing interface for direct manipulation.
Comparison with Existing Solutions
| EMQX Rule Engine | Kafka Streams | Flink | RobustMQ Rule Engine | |
|---|---|---|---|---|
| Deployment | Bundled with Broker | Independent JVM service | Independent cluster | Built into kernel, universal for edge/cloud |
| Expressiveness | SQL dialect | Java DSL | Java/SQL | Operator set + CEL |
| Industrial data support | Weak | None | None | Built-in bytes_decode, unit_convert |
| AI-friendly | Weak | Weak | Weak | MCP Server + standardized operators |
| Edge deployment | Not supported | Not supported | Not supported | Single binary, no external dependencies |
| Best suited for | Simple MQTT filtering | Kafka stream computing | Complex stateful computation | IoT data cleansing, edge preprocessing |
RobustMQ's rule engine is not positioned to replace Flink, but to cover scenarios Flink can't reach: lightweight data cleansing on edge devices, preprocessing IoT data before it enters the Broker, format mapping in Connector middleware, and simple transformation needs where deploying an external framework is undesirable.
In other words: users can solve 80% of their data cleansing needs within RobustMQ itself, without needing to deploy a separate Flink cluster for this purpose.
Worth noting in particular: EMQX's Flow Designer supports at most one Function node + one Filter node per rule; multi-step processing either requires writing complex SQL or chaining multiple rules. RobustMQ's operator chain supports any N operators in sequence — more expressive and more intuitive to configure.
Summary
What We're Building
Build a unified stateless rule engine at the RobustMQ kernel level, covering data processing needs across all MQTT, Kafka, and future protocol channels. The goal is to enable users to complete 80% of data cleansing and transformation work without introducing any external framework.
What It Should Look Like
Users don't need to understand rule engine syntax or operator details. Whether in the Dashboard or an external AI tool, they describe their requirements in natural language, the system generates a rule, and it takes effect immediately after confirmation:
"Convert temperature from Celsius to Fahrenheit, add an alert label if above 80°C, send to alert/${device_id}"
↓ AI generates rule
↓ User confirms
↓ Rule takes effect, every message automatically processedThe rule engine exposes standard interfaces through an MCP Server, allowing AI tools (Claude, Cursor, various Agents) to operate it directly — configuring rules and writing code all within the same workflow.
Technical Implementation
Three-layer architecture:
- Execution Engine (Runtime): Rust implementation, purely stateless; each message is processed independently. Operators execute in sequence, with the output of each operator as the input to the next. CEL (
cel-rust) handles dynamic expression evaluation infilterandset; other operators are implemented through a built-in function registry. - Rule Schema: A Rule = match (match conditions) + N operators + sink (output target) + on_error (failure strategy). JSON description, compiled at startup, hot-reloaded without restart. Multiple Rules execute independently in parallel without affecting each other.
- AI Layer: Prompt + Schema constraints ensure generation quality; Schema validation provides a backstop for AI hallucinations. The MCP Server exposes tools like
create_rule,test_rule, andexplain_rule.test_ruleis the most critical — verify with a real message before activating.
Operator Set:
- Most operators leverage existing Rust crates (
serde_json,regex,prost,csv,flate2,chrono, etc.) — low implementation cost - Core operators needing custom implementation:
unit_convert(engineering unit conversion),bytes_decode(industrial byte parsing),normalize(data normalization),threshold_alert(edge alert labeling) - Framework layer: Rule matching, operator chain scheduling, on_error branch handling
Key Differences from EMQX:
| Dimension | EMQX Flow | RobustMQ |
|---|---|---|
| Processing steps per rule | At most 1 Function + 1 Filter | N operators in any combination |
| Protocol binding | MQTT only | MQTT + Kafka + future protocols |
| Industrial data support | None | bytes_decode / unit_convert |
| AI integration | None | Natural language generation + MCP Server |
| Edge deployment | Not supported | Single binary, no external dependencies |
The rule engine is not an ancillary feature of RobustMQ — it is the key step that transforms it from a "message queue" into a "data processing platform."
Project URL: https://github.com/robustmq/robustmq
