RobustMQ: Reflections and Exploration in AI Scenarios
While building RobustMQ, we've been thinking about a question: What application scenarios should the next-generation message platform serve? When we dove into AI training, we found an unexpected but reasonable answer—AI training data pipelines might be the ideal scenario to validate RobustMQ's technical capabilities.
An Unexpected Discovery
Initially we didn't prioritize AI. Our main focus was MQTT protocol implementation for the traditional IoT device connectivity scenario. But during a technical exchange, an engineer from a large model training team complained: "Our GPU utilization is only around 60%. For 30–40% of the time, GPUs are waiting for data. We're burning over $100,000 per day, with a third wasted on data loading."
That made us think: Why? They tried Kafka—latency too high. Tried Redis—data volume too large to fit. Tried reading directly from S3—couldn't handle the concurrency. They were piecing together various tools, but none truly suitable for AI training.
When we delved into AI training's data flow characteristics, we found this is exactly where RobustMQ's tech stack excels. Rust's zero-GC means more stable latency; compute-storage separation means elastic adaptation to training cluster changes; high-performance kernel means TB-level throughput. These features designed for IoT and general scenarios found perfect application in AI scenarios.
Data Challenges in AI Training
To understand RobustMQ's value in AI scenarios, we first need to understand AI training's data challenges.
A large model training project typically uses dozens or hundreds of GPUs, each costing $2–3 per hour. Training data can be several TB or even tens of TB, scattered in S3 or HDFS. During training, data must be continuously read from storage and sent to GPUs for computation.
The problem: GPU compute speed far exceeds data loading speed. An A100 GPU can process hundreds of GB per second, but S3 read latency can be tens to hundreds of milliseconds. GPUs spend a lot of time waiting—pure resource waste. At current GPU costs, this waste can be thousands to tens of thousands of dollars per day.
Modern large model training is distributed. Dozens or hundreds of GPUs work simultaneously, frequently synchronizing gradients and exchanging parameters. Traditional communication like NCCL works well for single-machine multi-GPU, but cross-machine efficiency drops noticeably. And once a node fails, the whole training task can fail—days of computation gone.
Data preprocessing is also a challenge. Training data often needs cleaning, augmentation, feature extraction, etc. These steps are usually coupled with training, making debugging hard and extensibility poor. Trying a new data augmentation strategy means changing the whole training code.
What RobustMQ Can Do
High-Performance Data Buffering and Distribution
RobustMQ's first role in AI training is as a high-performance data buffering layer. In the traditional architecture, each GPU training process reads directly from S3 or HDFS. With 128 GPUs training simultaneously, 128 concurrent read requests hit storage. S3 has limited concurrency—requests get throttled and latency increases. Each read goes over the network, typically 50–200ms latency.
RobustMQ's approach: add a message buffering layer between storage and compute. We preload training data from S3 in batches into RobustMQ cluster memory and SSD. This preloading can happen before training starts or continuously during training without blocking GPU computation. After loading into RobustMQ, data is distributed to each GPU with microsecond-level latency.
The implementation: RobustMQ maintains a multi-layer data cache. Hottest data stays in memory for microsecond access; next-hot in local SSD with millisecond latency; colder data loaded from remote storage on demand. Cache policy uses LRU combined with training data access patterns for intelligent prefetching.
Training processes fetch data from RobustMQ through a simple consumer API:
from robustmq import Consumer
consumer = Consumer('training-data-stream')
for message in consumer:
images, labels = message.decode()
loss = model.train(images, labels)GPUs barely wait for data. RobustMQ preloads and caches the next few batches; when training calls the consume interface, data is already in memory and returns immediately.
Our initial tests show: in a 64-GPU training cluster, using RobustMQ as data buffer, data loading P99 latency dropped from 150ms to 3ms, GPU utilization rose from 65% to 88%, and training time shortened by about 25%. For long-running training, this means significant cost savings.
RobustMQ also supports intelligent backpressure. When training slows (e.g., during validation), RobustMQ automatically reduces data load speed to avoid memory overflow. When training resumes, it speeds up. This adaptive mechanism improves stability.
Distributed Training Communication Bus
RobustMQ's second role is as distributed training communication infrastructure. In large-scale distributed training, GPU nodes need frequent information exchange: gradient sync, parameter updates, training state coordination.
Traditional NCCL uses synchronous blocking. All GPUs compute gradients for one batch, then perform All-Reduce to aggregate gradients and update model parameters. The fastest GPU waits for the slowest—overall speed depends on the slowest node. For cross-machine communication, network latency is amplified and efficiency drops noticeably.
RobustMQ provides asynchronous gradient sync. Each GPU publishes to RobustMQ's gradient topic immediately after computing gradients—no waiting for others. Each GPU subscribes to the topic, receives other GPUs' gradients, then updates parameters.
Implementation: each GPU node runs a gradient publisher and aggregator. After computing gradients, the publisher serializes the gradient tensor (we support zero-copy tensor serialization) and sends to RobustMQ. The aggregator continuously receives other nodes' gradients from RobustMQ and accumulates in local gradient buffer. When accumulated gradients reach a threshold (e.g., 80% of nodes), it triggers parameter update without waiting for all nodes.
This asynchronous mode has clear advantages at scale. In our tests, 128-GPU distributed training: NCCL sync mode ~180ms per step with 50ms for gradient sync; RobustMQ async mode ~30ms for gradient sync, doesn't block computation, ~15–20% overall speedup.
RobustMQ also supports dynamic node management. GPU nodes can be added or removed during training. When a new node joins, it joins the gradient topic consumer group; RobustMQ automatically adjusts message distribution and the new node starts receiving gradients and participating. When a node exits (e.g., spot instance preempted), RobustMQ detects the disconnect and reassigns that node's data shards to others.
Fault tolerance is also important. We record each GPU node's training progress via message offset. When a node fails and restarts, it can query RobustMQ for its last consumption position and resume from the checkpoint—no restart from scratch. For tasks running days or weeks, this avoids massive recomputation.
Efficient Large File and Tensor Transfer
AI training often needs to transfer large files: model weights, checkpoints, intermediate results. GPT-3 model files are ~700GB; larger models can exceed a TB. How to efficiently transfer these between cluster nodes is a practical problem.
Traditional message queues limit message size. Kafka's default max message is 1MB; even with config changes it's around 10MB. Transferring 700GB with Kafka means splitting into tens of thousands of small messages and reassembling at the receiver—tedious and error-prone.
RobustMQ implements transparent large-file chunked transfer. When the sender calls the large-file send API, RobustMQ automatically splits the file into 64MB or 128MB chunks, each sent as a separate message. RobustMQ also generates file metadata including file ID, total chunks, per-chunk hashes, and file metadata (size, type, compression, etc.).
The receiver gets metadata first, knows how many chunks to expect, then receives chunks in parallel. Chunks can arrive out of order—the receiver maintains a bitmap of received vs. pending chunks. After all chunks arrive, it verifies each chunk's hash and reassembles in order.
Parallel transfer is the key optimization. Traditional sequential transfer is limited by single TCP connection bandwidth. RobustMQ can establish multiple parallel connections, each transferring different chunks. In our test environment (10Gbps network), transferring a 500GB file: sequential ~7 minutes; RobustMQ 8-way parallel ~90 seconds—nearly 5x faster.
Checkpoint resume guarantees reliability. If the network fails during transfer, the receiver keeps received chunks. After reconnecting, only missing chunks need transfer—no restart from scratch. For TB-scale files, this saves significant time.
For tensor transfer, we've done specialized optimization. Tensors have specific formats (shape, dtype, strides); we support zero-copy tensor serialization. The sender reads directly from in-memory tensor structure, adds metadata, and sends—no conversion to generic format first. The receiver constructs the tensor object directly, avoiding extra memory copies.
We're also exploring GPU Direct RDMA. This lets data go directly from the network adapter to GPU memory, bypassing CPU and system memory entirely. If implemented, latency drops further and CPU involvement decreases.
Decoupled Data Processing Pipeline
AI training data processing usually involves multiple stages: raw data read from storage, cleaning, augmentation, feature extraction, batching, feeding to GPU. Traditionally these are in one training script or PyTorch's DataLoader.
This coupled architecture has issues. First, poor extensibility—if augmentation is CPU-heavy and training GPU-heavy, they have different resource needs but can't scale independently when coupled. Second, poor flexibility—trying a new augmentation strategy means changing training code and risking stability. Third, hard to debug—if training is poor, it's hard to tell if it's data or model.
RobustMQ can decouple this pipeline. Each processing stage is an independent service connected by message queue:
原始数据读取 → Topic: raw-data
↓
数据增强服务 → Topic: augmented-data
↓
特征提取服务 → Topic: features
↓
训练服务(GPU)The raw data service loads images, text, etc. from S3 and publishes to "raw-data". The augmentation service subscribes to "raw-data", does rotation, crop, color adjustment, etc., publishes to "augmented-data". Feature extraction subscribes to "augmented-data", extracts features, publishes to "features". The training service subscribes to "features" and feeds processed data to the GPU.
With this architecture, each stage can deploy, scale, and monitor independently. If augmentation is the bottleneck (monitoring shows augmented-data consumption delay increasing), scale up augmentation instances from 2 to 10 without touching other stages. Want to try new augmentation? Start a new augmentation service consuming from "raw-data" and publishing to "augmented-v2"; point the training service to "augmented-v2"—no other changes.
Debugging becomes intuitive. Each stage's input/output is in the message queue—easy to sample and inspect. Found an abnormal batch? Trace its data path through raw data, augmented data, extracted features—locate the problem at each stage. RobustMQ's message IDs and timestamps make this easy.
Data processing reuse becomes possible. The same raw data can flow through different pipelines to different training datasets. One augmentation service does light augmentation, another heavy, publishing to different topics. Training teams can run multiple experiments comparing different data processing strategies without re-reading raw data.
Multimodal Data Alignment
Modern large model training increasingly involves multimodal data: text, images, audio, video. These modalities need strict alignment. E.g., image-text pair training—each image must match its description; video understanding—video segments must match subtitles and audio.
Traditionally this is managed via file system with filenames or indexes. E.g., img_001.jpg pairs with caption_001.txt, audio_001.wav with video_001.mp4. This is error-prone: filename collisions, index misalignment, high manual management cost. And hard to extend—adding a new modality (e.g., depth map) requires adjusting the whole data management logic and new naming rules.
RobustMQ's message model naturally supports complex data structures. A message can contain multiple fields of different types. For multimodal training samples:
{
"sample_id": "train_00001",
"image": "<binary data>",
"caption": "一只橙色的猫坐在沙发上",
"audio": "<binary data>",
"depth_map": "<binary data>",
"metadata": {
"source": "dataset_v2.1",
"timestamp": "2025-01-15T08:30:00Z",
"quality_score": 0.95,
"tags": ["animal", "indoor", "furniture"]
}
}One message encapsulates a complete training sample—all modalities are bound together; alignment errors are logically impossible. When the training program consumes, it gets complete, aligned multimodal data.
Another advantage: flexibility. Different training tasks may only need some modalities. Text-to-image generation needs only text and image—configure consumer to transfer only those fields, saving bandwidth. Image classification needs only image and label—skip audio. RobustMQ supports field-level selective transfer.
Version management becomes simple. Different dataset versions go to different topics. "training-data-v1.0" is initial; "training-data-v1.1" fixes some labels; "training-data-v2.0" adds depth and audio. Specify which topic to consume when training—that's your dataset version.
Streaming Training Data Updates
In continuous or online learning, training data isn't static—it's continuously produced. E.g., recommendation model training needs user behavior updates; reinforcement learning needs latest agent experience; content moderation needs new violation examples.
Traditional batch processing periodically (e.g., daily) re-prepares data and restarts training. High latency, poor resource use—restarting means discarding previous computation and starting over.
RobustMQ supports streaming training data updates. New data is continuously published to the training data topic; training processes consume continuously. With incremental learning, the model keeps updating without full retraining.
Message ordering ensures temporal relationships. RobustMQ guarantees same-partition messages are consumed in publish order. For time-sensitive data, assign by timestamp to partition to ensure correct ordering during training.
Offset management lets training processes control consumption progress. Training records the processed offset and can pause, resume, or rewind to a previous position. Useful for debugging and experiments.
Message retention is flexible. Configure by time (e.g., last 7 days) or by size (e.g., last 100GB). Historical data can be used for retrospective analysis or periodic retraining.
Experiment Management and Data Traceability
AI R&D is a process of constant experimentation. Same dataset, different model architecture, hyperparameters, training strategy—different results. Researchers try many combinations to find optimal configuration.
Managing these experiments is challenging. For each experiment: what data? what model? what parameters? how long? how did it perform? Months later, reproducing a result—how to ensure exactly the same data and config?
RobustMQ's message model helps trace experiment details. Each training data message carries rich metadata: dataset version, preprocessing params, data source, quality score. Training records the consumed message range—start offset, end offset. After training, we know exactly which data was used and how it was processed.
With RobustMQ's message replay, experiments can be precisely reproduced. Record a successful experiment's consumption range and params (consumer group ID, start offset, end offset)—next time replay the exact message sequence for identical data. Valuable for debugging, validating improvements, publishing papers—reviewers care about reproducibility.
Message tags and indexing also help. Tag specific data messages: "high-quality", "edge-case", "verified". During training, consume selectively—e.g., train base model on high-quality data first, then fine-tune on full data including edge cases.
RobustMQ supports metadata-based query and filtering. Quickly find "image-text pairs with quality score > 0.9" or "samples from a specific source." This enables easy data analysis—statistics, bias detection, problematic sample localization.
Distributed Checkpoint Coordination
Long-running training needs periodic checkpoints. For fault tolerance—resume after interruption—and for evaluation—load models from different training stages.
Checkpoints usually include model weights, optimizer state, random seed, training step, etc.—from a few GB to hundreds of GB. In distributed training, each node has its part of model and state. How to coordinate all nodes to save checkpoints simultaneously? How to ensure consistency? How to transfer and store efficiently?
RobustMQ can coordinate checkpoint saving. The training coordinator publishes a "save checkpoint" control message to a coordination topic with checkpoint ID, save time, etc. All training nodes subscribe; on receipt, they pause after the current batch, serialize state, and publish to the checkpoint topic.
Each node's checkpoint message includes: node ID, checkpoint ID, model shard data, optimizer state, training step, random seed. The coordinator subscribes to the checkpoint topic and collects all nodes' messages. When all nodes' checkpoints are collected (checked via node ID list), the coordinator confirms success and publishes an acknowledgment. Training nodes resume after receiving the ack.
This ensures checkpoint consistency. All nodes' checkpoints correspond to the same training step—no mix of some nodes with new state and others with old. Saving is parallel—all nodes serialize and send simultaneously; total time is the slowest node's time, much faster than sequential.
Checkpoint data can be persisted in RobustMQ or further stored in S3. RobustMQ's large-file transfer makes checkpoint distribution efficient. If you need to resume on a new cluster or distribute checkpoints to multiple environments, it's quick—no long file copies.
The checkpoint topic can retain the most recent N checkpoints (e.g., 10). Easy rollback. If the latest checkpoint is problematic (e.g., model diverging), load the previous one and continue—only losing the time between the two checkpoints.
Dynamic Training Parameter Configuration
AI training hyperparameters heavily affect results: learning rate, batch size, weight decay, dropout, etc. Traditionally parameters are in config files; changing parameters means restarting the training task. For multi-day runs, restart is costly.
RobustMQ supports dynamic training parameter updates. Training nodes subscribe to a parameter config topic; the coordinator can publish new config anytime. Training nodes apply new parameters at a suitable moment (e.g., after the current epoch) without interrupting training.
Useful for learning rate scheduling. Many strategies use decay: high rate for fast convergence, lower rate for fine-tuning. The coordinator monitors the loss curve and publishes a lower learning rate when loss stops improving. All training nodes update and continue.
Automated hyperparameter tuning can use this mechanism. Optimization algorithms (Bayesian, genetic, etc.) adjust parameters based on training results and publish via RobustMQ. Training nodes apply and feed back. Real-time tuning can accelerate finding optimal config compared to grid or random search.
Parameter change history is recorded in messages. Each change is a message with timestamp, values, and reason. Combined with training metrics, this helps analyze how parameters affect results and understand training dynamics. Valuable for debugging and optimizing training strategy.
Real-time Monitoring and Anomaly Detection
Training needs close monitoring. Is data loading speed normal? GPU utilization? Is the loss curve reasonable? Data quality issues? Where's the bottleneck?
RobustMQ provides rich metrics. Per topic: production rate (messages/sec, bytes/sec), consumption rate, queue backlog (unconsumed messages), end-to-end latency. Per consumer: consumption speed, lag (how far behind production), error rate, retry count.
These metrics connect directly to training results. E.g., low GPU utilization—check RobustMQ monitoring, might see "features" topic consumption delay increasing, so data loading is the bottleneck. Dig deeper: "augmented-data" production rate dropping, so augmentation might be the problem. Further: one augmentation instance at 100% CPU—pinpoint the bottleneck. This layered tracing makes performance diagnosis systematic.
RobustMQ also records detailed per-message processing traces: produce timestamp, processing stages (per-stage duration), consumed by which training node, consume time—the full path is traceable. Useful for both diagnosis and pipeline optimization—e.g., if a stage is often the bottleneck, consider algorithm optimization or more resources.
Anomaly detection is also important. RobustMQ can monitor message statistics and identify anomalies. E.g., average image size suddenly increases (possibly abnormal high-resolution images), or some messages take unusually long (possibly corrupted)—RobustMQ can auto-flag or alert.
Slow message tracking identifies messages exceeding a processing threshold and records details: message summary, processing time, node, possible cause. This helps optimize data processing logic—e.g., if certain image types cause slow augmentation, adjust strategy or pre-filter.
Kernel Capability Validation
AI training may impose the strictest demands on message queues of any scenario. Microsecond latency, TB-scale throughput, frequent scaling, strict reliability, complex data processing. If RobustMQ's kernel can meet AI training needs, the design is successful and can support other, less demanding scenarios.
On latency: AI training needs microsecond message distribution. Each millisecond of latency, multiplied by millions of iterations, is significant waste. Rust's zero-GC shows clear advantage. Our tests: RobustMQ P99 latency stable under 5ms even at 100K messages/sec. Java-based queues under high load have GC-induced latency spikes—P99 can reach 50–100ms.
On throughput: a single large training job may need 10–50GB/s. Our single-node tests reached 120GB/s (on servers with high-speed network and NVMe SSD); 5-node cluster 400GB/s+. This satisfies AI training and other high-throughput scenarios like real-time log collection and stream analytics.
Elastic scaling was validated in AI training. Training clusters often change size—scale up during the day, down at night; spot instances can be preempted anytime. RobustMQ's compute-storage separation makes scaling simple. Compute nodes are stateless—starting a new node takes seconds; shutting down doesn't require data migration. Storage is independent. In tests we scaled from 5 to 20 nodes or 20 to 5 in 30 seconds; training transitioned smoothly without interruption.
On reliability: AI training can't tolerate data loss. Days of training lost to data issues is catastrophic. RobustMQ's multi-replica ensures messages aren't lost to single-point failure. Each message is replicated to 3 nodes by default; only when a majority confirm write do we return to the producer. Consumer at-least-once semantics prevent loss; with idempotent training code, correctness is guaranteed.
In one test we intentionally shut down a RobustMQ node during training to simulate hardware failure. The cluster detected the failure, reassigned that node's data shards to others, and training recovered from the failure point—downtime under 10 seconds. This fault tolerance is critical in production.
Boundaries of Exploration
To be clear: AI scenarios are exploratory in RobustMQ's current strategy—about 10% of resources. Our main focus remains MQTT protocol development and kernel refinement. MQTT is our first calling card and must reach 100% completeness—that's RobustMQ's foundation.
AI exploration goals: build demo-capable features proving RobustMQ's technical viability; deeply serve a few AI companies to validate real needs and solution effectiveness; build influence in the AI community through technical articles and conference talks; accumulate experience and validate kernel generality for possible future in-depth development.
We'll implement a basic AI training scenario adaptation layer with data preloading, high-speed distribution, gradient sync, and other core features. We'll cooperate with AI companies on POC validation, collecting real performance data and feedback. We'll write technical articles on AI training data challenges, publish benchmarks, and share at AI Infra conferences.
We won't pursue 100% AI scenario completeness, won't invest heavily in full productization, won't let AI delay MQTT development. AI's current value: demonstrate RobustMQ's technical capability, validate kernel generality, build "high-performance" brand in the tech community.
If validation shows strong AI demand, positive feedback, and sufficient market, we may make AI the second in-depth direction after MQTT reaches 100%—investing major resources like we did for MQTT.
If demand is weaker than expected or our solution isn't competitive enough, we'll shift to other protocols (e.g., Kafka) or scenarios. The exploration still adds value: we validated kernel capability in high-performance scenarios, built technical influence, attracted community attention. All beneficial for RobustMQ's long-term development.
From Exploration to Future
AI training data pipelines are just one part of AI. Inference services, data labeling, model management, MLOps workflows, edge AI—each has data flow and message communication needs.
AI inference needs low-latency request routing, distributing user requests to the right model instance and returning results. Online inference latency might be millisecond or sub-millisecond—stricter message queue requirements.
Data labeling manages large volumes of raw data, distributes to annotators, collects results, performs quality checks and consistency validation. Typical task distribution and result aggregation.
Model serving handles version management, canary releases, A/B testing. Different model versions map to different inference services; need intelligent routing and effect data collection.
MLOps workflows connect data prep, model training, evaluation, deployment—each stage possibly owned by different teams or systems, coordinated via messages.
Edge AI runs inference on resource-constrained edge devices—needs lightweight messaging and edge-cloud data sync.
These are future possibilities. RobustMQ's unified kernel design lets us reuse capabilities built in one scenario across others. We're currently focused on training exploration; we may extend to other AI stages and eventually cover data flow across the AI lifecycle.
But that's for later. Our current focus: bring MQTT to 100% while using AI to validate technology and build influence. The road ahead is long; we'll take it step by step.
