RobustMQ Multi-Protocol Architecture: Protocol Parsing from MQTT to Kafka and AMQP
RobustMQ's core vision is to be an All-In-One MQ. Supporting multiple protocols is the foundation, and the challenge lies not in implementing protocol parsing itself, but in how to let different protocols share infrastructure while not interfering with each other. Protocols must both reuse common capabilities like connection management, rate limiting, and storage, and maintain their own independence — a change in one protocol must not destabilize others.
This article documents RobustMQ's practice along this path. After two years of refinement, MQTT has gradually matured. We recently completed the framework integration of the Kafka and AMQP protocols — while the protocol logic itself is still a skeleton, the entire multi-protocol architecture is now running. Services for both protocols start normally and listen on their ports, with almost no invasive changes to existing code. This means we have essentially validated the multi-protocol path. What remains is simply filling in the concrete logic for each protocol within this framework.
The Conclusion First
After integrating the Kafka and AMQP protocols, the core startup logic of broker-server (the entry module responsible for starting all protocol services) added only two new lines of code:
self.start_kafka_broker(app_stop.clone());
self.start_amqp_broker(app_stop.clone());From construction to startup, neither protocol Broker Server introduced any new global state, any new initialization procedures, or any changes to any public module interfaces. Not a single line of MQTT code was touched.
This is not a coincidence — it is the result of architectural design. Let's break it down piece by piece.
MQTT's Foundation Is Already Solid
RobustMQ's MQTT implementation is already relatively mature. In the process of supporting the full set of MQTT features, we accumulated a solid set of infrastructure: network connection management, cluster-level rate limiting, node metadata caching, gRPC client connection pools, delayed message handling, storage adapters... These components have been stabilized through long-term refinement of MQTT and are naturally suited for cross-protocol reuse.
The key is how these components are organized. They all exist in BrokerServer as Arc<T>, as globally shared objects for the entire service process:
pub struct BrokerServer {
connection_manager: Arc<NetworkConnectionManager>,
client_pool: Arc<ClientPool>,
broker_cache: Arc<NodeCacheManager>,
global_rate_limiter: Arc<GlobalRateLimiterManager>,
// ... plus per-protocol params
}The semantics of Arc<T> is a reference-counted shared pointer — clone() merely copies the pointer; the underlying data is the same instance. When Kafka and AMQP need these capabilities, they simply clone() an Arc pointer — no data copying, no reinitialization, no additional overhead. The connection manager is the same one, the client pool is the same one, the rate limiter is the same one, the cache is the same one. Three protocols share the same infrastructure, completely on equal footing.
This is the fundamental reason why integration costs are low: MQTT has already laid the foundation, and later protocols simply stand on it and build upward.
Params Struct: Clear Protocol Boundaries
After establishing the shared global objects, another problem needs to be solved: how to pass these objects to each protocol while maintaining clear boundaries, preventing protocols from depending directly on each other.
RobustMQ's solution is to define a dedicated Params struct for each protocol. Each protocol's Broker Server does not accept scattered constructor arguments — it accepts only a single unified struct:
#[derive(Clone)]
pub struct KafkaBrokerServerParams {
pub connection_manager: Arc<ConnectionManager>,
pub client_pool: Arc<ClientPool>,
pub broker_cache: Arc<NodeCacheManager>,
pub global_limit_manager: Arc<GlobalRateLimiterManager>,
pub stop_sx: broadcast::Sender<bool>,
pub proc_config: ProcessorConfig,
}AmqpBrokerServerParams has an identical structure. This design has several obvious advantages in practice:
Clear boundaries, explicit dependencies. Whatever external dependencies a protocol needs, a look at its struct tells you everything — no need to trace through the code. If someday Kafka needs to add a new dependency, you add a field to the struct, and the compiler will produce errors at all construction sites, forcing developers to consciously handle this dependency rather than silently expanding global state.
Centralized construction, visible costs. All protocol params are assembled in a single place — BrokerServer::new(). Kafka and AMQP currently depend only on global objects, so assembly is entirely synchronous with no async initialization required:
let kafka_params = KafkaBrokerServerParams {
connection_manager: connection_manager.clone(),
client_pool: client_pool.clone(),
broker_cache: broker_cache.clone(),
global_limit_manager: global_rate_limiter.clone(),
stop_sx: main_stop_send.clone(),
proc_config: ProcessorConfig {
accept_thread_num: config.kafka_runtime.network.accept_thread_num,
handler_process_num: config.kafka_runtime.network.handler_thread_num,
channel_size: config.kafka_runtime.network.queue_size,
},
};Compare this to MQTT's params construction — MQTT needs to async-initialize over a dozen components like RetainMessageManager, DelayMessageManager, and SubscribeManager inside broker_runtime.block_on(), making the construction process relatively complex. Kafka and AMQP currently have none of these protocol-specific components, so their construction code is extremely concise. As protocol features mature, this struct will grow, but the growth is controlled.
Independently tunable. Each protocol has its own configuration section — [kafka_runtime.network], [amqp_runtime.network] — where network thread count and processing queue size can be set independently. If MQTT traffic is high, give it more threads; Kafka and AMQP are configured as needed, without affecting each other.
Network Layer: Implemented Once, All Protocols Benefit
The network layer is another important leverage point in RobustMQ's multi-protocol architecture. The network-server crate provides a complete network handling framework, abstracting away low-level details like the TCP connection accept loop, read/write buffer management, and connection lifecycle management.
More importantly, network-server already supports five transport-layer protocols: TCP, TLS, WebSocket, WebSocket Secure (WSS), and QUIC. These capabilities are protocol-agnostic — any application-layer protocol integrated into this framework automatically gets support for all five transport layers, with no additional implementation needed whatsoever.
This is hugely significant for onboarding new protocols. When we fill in the protocol logic for Kafka or AMQP, we don't need to think about "should we support TLS?" or "should we support WebSocket?" — the network layer has already handled that for you. Kafka over QUIC? AMQP over TLS? In this framework, these aren't extra work — they're capabilities you get by default.
Stop Signal: One Channel, All Protocols in Sync
Shutdown coordination is an easy place to trip up in a multi-protocol system. The most intuitive approach is to give each protocol its own stop channel and notify them one by one during shutdown. But this introduces ordering dependencies — which one stops first, which stops last? If a channel's receiver isn't ready yet when the sender fires the signal, what happens? These problems aren't hard to solve individually, but each requires careful handling, and the accumulated complexity makes the code brittle.
RobustMQ's approach is to share a single shutdown channel using tokio::sync::broadcast:
let (app_stop, _) = broadcast::channel::<bool>(2);
// ...
self.start_mqtt_broker(app_stop.clone()).await;
self.start_kafka_broker(app_stop.clone());
self.start_amqp_broker(app_stop.clone());The semantics of broadcast is one-to-many — the sender sends once, and all subscribers receive the message. Each of the three protocols internally calls stop_sx.subscribe() to subscribe to this channel. When the shutdown signal arrives, all three protocols receive it simultaneously and shut down in an orderly manner, requiring no external coordination logic. No ordering dependencies, no channel lifecycle issues, no need to decide "who stops first."
This design has a useful side effect: when MQTT triggers a shutdown for whatever reason, Kafka and AMQP also receive the signal simultaneously, and the entire process exits in a consistent state — there's no intermediate state where one protocol is still running while others have already stopped.
How Many Steps Does It Take to Integrate a New Protocol?
With this foundation in place, integrating a new protocol (say, the RocketMQ protocol) follows a well-defined process with no surprises.
Step one: Create a new protocol crate.
Create the rocketmq-broker crate, focused on RocketMQ protocol frame parsing and business logic implementation. All low-level network handling relies entirely on network-server — no need to handle TCP accept, connection read/write, or buffer management yourself. The five transport protocol supports provided by the network layer are also automatically inherited.
Step two: Define a Params struct.
Following the pattern of KafkaBrokerServerParams, define RocketMQBrokerServerParams. At this stage, only the necessary global object dependencies need to be listed; the struct can be expanded incrementally as features are built out. This struct is the complete declaration of the protocol's dependencies on the outside world.
Step three: Add a configuration section.
Add a rocketmq_runtime: RocketMQRuntime field to BrokerConfig, with an embedded network: Network sub-configuration. This is an addition to the existing configuration structure, not a modification — it does not affect any existing protocol configurations.
Step four: Integrate into broker-server.
// Assemble params in BrokerServer::new()
let rocketmq_params = RocketMQBrokerServerParams { ... };
// Start with a single line in BrokerServer::start()
self.start_rocketmq_broker(app_stop.clone());Done. No changes to any MQTT code, no changes to any public module interfaces, shutdown logic is automatically covered, five transport-layer protocols are automatically supported. The entire integration effort is concentrated on the protocol logic itself — the framework layer costs virtually nothing.
Why This Design Works
Looking back at the overall architecture, several key decisions together underpin the extensibility of this multi-protocol framework:
Share, don't copy. Infrastructure is shared as Arc<T>, so protocols access the same data — no synchronization overhead and no risk of state inconsistency. This is the fundamental assumption of the entire architecture: common capabilities should have only one instance.
Global objects are centrally managed in BrokerServer. All common resources have one and only one creation point. Each protocol can only access them via params — it cannot bypass this boundary to access global state directly. This constraint ensures isolation between protocols and makes resource lifecycle management clear.
Network layer decoupled from protocol logic. The network-server crate handles all transport-layer details; protocol crates focus only on frame parsing and business logic. This dividing line is very stable — upgrades to the network layer (such as adding a new transport protocol) are completely transparent to the protocol layer, and changes to the protocol layer don't affect the network layer.
Shutdown signal broadcast, not point-to-point. Using broadcast to centrally manage shutdown eliminates coordination complexity in multi-protocol scenarios, letting each protocol independently implement its own graceful shutdown logic without needing to know about the existence of other protocols.
Params struct as explicit dependency declaration. Each protocol explicitly declares its dependencies via struct fields, and the compiler forces developers to handle changes when dependencies change. This constraint becomes increasingly valuable as the project grows.
Looking Ahead: Connecting to the Storage and Metadata Layers
The protocol frameworks for Kafka and AMQP are now running, but the business logic is still a skeleton. The core work of the next phase is connecting these two protocols to RobustMQ's existing storage and metadata layers.
The storage layer is ready. RobustMQ's Storage Engine has been built, supporting multiple storage backends: memory, segment files, RocksDB, MySQL, MinIO, and S3. It exposes a consistent read/write interface to upper layers through a unified StorageDriverManager abstraction. MQTT already uses this storage layer at production level; its stability has been proven. Message persistence for Kafka and AMQP can connect directly to this abstraction layer without implementing their own storage logic.
The metadata layer is ready. The Meta Service implements distributed metadata management based on Raft and supports plugin-style extensibility. Topic metadata, partition information, consumer group state — all the data that Kafka and AMQP need to manage can be stored in this metadata service, receiving out-of-the-box distributed consistency guarantees.
The path forward is clear. Taking Kafka as an example, the upcoming work roughly divides into three layers:
- Protocol parsing layer: Implement complete Kafka protocol frame parsing, covering core APIs like Produce, Fetch, Metadata, and OffsetCommit
- Metadata integration: Topic creation, partition assignment, and Broker registration via Meta Service gRPC interfaces; consumer group coordination logic integrated with metadata storage
- Storage integration: Message writes and reads via
StorageDriverManager; Offset management reusing the existingOffsetManager
The AMQP path is similar: Exchange, Queue, and Binding metadata managed through Meta Service; message routing and persistence integrated with the storage layer.
The most important thing on this road is: the interfaces of the storage and metadata layers are already stable, and MQTT's practice has proven their design is sound. The business logic implementation of Kafka and AMQP can be built on top of these validated foundations — no need to reinvent the wheel, and no risk of being blocked by problems at the foundational layer.
In our view, what's most gratifying is that in the process of adapting the Kafka and AMQP protocols, we validated that the long-term architectural and code refinement — the continuous adjustment and optimization — has been effective. It also gave us the confidence to keep quietly polishing our Broker, and to wait patiently for the blossoms to come.
