MongoDB Connector
Overview
The MongoDB connector is a data integration component provided by RobustMQ for bridging MQTT messages to MongoDB NoSQL database systems. This connector supports document storage, flexible data models, and horizontal scaling, suitable for IoT data storage, historical data analysis, unstructured data storage, and real-time data processing scenarios.
Configuration
Connector Configuration
The MongoDB connector uses the MongoDBConnectorConfig structure for configuration:
pub struct MongoDBConnectorConfig {
pub host: String, // MongoDB server address
pub port: u16, // MongoDB server port
pub database: String, // Database name
pub collection: String, // Collection name
pub username: Option<String>, // Username (optional)
pub password: Option<String>, // Password (optional)
pub auth_source: Option<String>, // Authentication database (optional)
pub deployment_mode: MongoDBDeploymentMode, // Deployment mode
pub replica_set_name: Option<String>, // Replica set name (optional)
pub enable_tls: bool, // Enable TLS/SSL
pub max_pool_size: Option<u32>, // Maximum pool size
pub min_pool_size: Option<u32>, // Minimum pool size
}Deployment Modes
The MongoDB connector supports three deployment modes:
| Mode | Description | Use Case |
|---|---|---|
single | Single instance | Development/test environments |
replicaset | Replica set | Production (high availability) |
sharded | Sharded cluster | Large-scale data storage |
Configuration Parameters
| Parameter | Type | Required | Description | Example |
|---|---|---|---|---|
host | String | Yes | MongoDB server address | localhost or mongodb.example.com |
port | u16 | No | MongoDB server port, default 27017 | 27017 |
database | String | Yes | Database name | mqtt_data |
collection | String | Yes | Collection name | mqtt_messages |
username | String | No | Username | mqtt_user |
password | String | No | Password | mqtt_pass |
auth_source | String | No | Authentication database, default admin | admin |
deployment_mode | String | No | Deployment mode, default single | single/replicaset/sharded |
replica_set_name | String | No | Replica set name (required for replicaset mode) | rs0 |
enable_tls | bool | No | Enable TLS/SSL, default false | true or false |
max_pool_size | u32 | No | Maximum pool size | 100 |
min_pool_size | u32 | No | Minimum pool size | 10 |
Configuration Examples
JSON Configuration Format
Basic Configuration (Single Mode):
{
"host": "localhost",
"port": 27017,
"database": "mqtt_data",
"collection": "mqtt_messages",
"username": "mqtt_user",
"password": "mqtt_pass",
"auth_source": "admin",
"deployment_mode": "single",
"enable_tls": false
}Replica Set Configuration:
{
"host": "mongodb-0.mongodb.svc.cluster.local",
"port": 27017,
"database": "iot_data",
"collection": "sensor_readings",
"username": "iot_user",
"password": "iot_pass",
"auth_source": "admin",
"deployment_mode": "replicaset",
"replica_set_name": "rs0",
"enable_tls": true,
"max_pool_size": 100,
"min_pool_size": 10
}No Authentication Configuration (Development):
{
"host": "localhost",
"port": 27017,
"database": "test_db",
"collection": "test_messages",
"deployment_mode": "single",
"enable_tls": false
}Complete Connector Configuration
{
"cluster_name": "default",
"connector_name": "mongodb_connector_01",
"connector_type": "MongoDB",
"config": "{\"host\": \"localhost\", \"port\": 27017, \"database\": \"mqtt_data\", \"collection\": \"mqtt_messages\", \"username\": \"mqtt_user\", \"password\": \"mqtt_pass\", \"deployment_mode\": \"single\"}",
"topic_name": "sensor/data",
"status": "Idle",
"broker_id": null,
"create_time": 1640995200,
"update_time": 1640995200
}Data Model
Document Structure
The MongoDB connector converts MQTT messages to BSON documents for storage, preserving the complete message structure:
{
"_id": ObjectId("507f1f77bcf86cd799439011"), // Auto-generated by MongoDB
"offset": 12345, // Message offset
"header": [ // Message headers
{
"name": "topic",
"value": "sensor/temperature"
},
{
"name": "qos",
"value": "1"
}
],
"key": "sensor_001", // Message key (client ID)
"data": [123, 34, 116, 101, 109, 112, ...], // Message data (byte array)
"tags": ["sensor", "temperature"], // Message tags
"timestamp": 1640995200, // Message timestamp (seconds)
"crc_num": 1234567890 // CRC checksum
}Field Description
| Field | Type | Description | Index Recommendation |
|---|---|---|---|
_id | ObjectId | MongoDB document unique identifier | Auto-indexed |
offset | Number | Message offset | Recommended |
header | Array | Message header information array | - |
key | String | Message key (usually client ID) | Recommended |
data | Binary | Message data (byte array) | - |
tags | Array | Message tags array | Recommended |
timestamp | Number | Message timestamp (seconds) | Recommended |
crc_num | Number | CRC checksum | - |
Index Recommendations
To improve query performance, it's recommended to create the following indexes:
// 1. Timestamp query index
db.mqtt_messages.createIndex({ "timestamp": -1 })
// 2. Client ID query index
db.mqtt_messages.createIndex({ "key": 1 })
// 3. Tags query index
db.mqtt_messages.createIndex({ "tags": 1 })
// 4. Compound index (client + timestamp)
db.mqtt_messages.createIndex({ "key": 1, "timestamp": -1 })
// 5. Offset query index
db.mqtt_messages.createIndex({ "offset": 1 })
// 6. TTL index (auto-delete expired data)
db.mqtt_messages.createIndex(
{ "timestamp": 1 },
{ expireAfterSeconds: 2592000 } // Auto-delete after 30 days
)Using robust-ctl to Create MongoDB Connector
Basic Syntax
Use the robust-ctl command-line tool to create and manage MongoDB connectors:
robust-ctl mqtt connector create \
--connector-name <connector-name> \
--connector-type <connector-type> \
--config <configuration> \
--topic-id <topic-id>Creating MongoDB Connector
1. Basic Creation Command
# Create MongoDB connector
robust-ctl mqtt connector create \
--connector-name "mongodb_connector_01" \
--connector-type "MongoDB" \
--config '{"host": "localhost", "port": 27017, "database": "mqtt_data", "collection": "mqtt_messages", "username": "mqtt_user", "password": "mqtt_pass", "deployment_mode": "single"}' \
--topic-id "sensor/data"2. Parameter Description
| Parameter | Description | Example |
|---|---|---|
--connector-name | Connector name, must be unique | mongodb_connector_01 |
--connector-type | Connector type, must be MongoDB | MongoDB |
--config | JSON format configuration | {"host": "localhost", ...} |
--topic-id | MQTT topic to monitor | sensor/data |
3. Advanced Configuration Examples
Replica Set Configuration:
robust-ctl mqtt connector create \
--connector-name "mongodb_replicaset" \
--connector-type "MongoDB" \
--config '{"host": "mongodb-0.example.com", "port": 27017, "database": "iot_data", "collection": "sensor_data", "username": "iot_admin", "password": "secure_pass", "auth_source": "admin", "deployment_mode": "replicaset", "replica_set_name": "rs0", "enable_tls": true, "max_pool_size": 100}' \
--topic-id "iot/sensors/+/data"High Performance Configuration:
robust-ctl mqtt connector create \
--connector-name "mongodb_high_perf" \
--connector-type "MongoDB" \
--config '{"host": "mongodb.local", "port": 27017, "database": "high_throughput", "collection": "messages", "username": "perf_user", "password": "perf_pass", "max_pool_size": 200, "min_pool_size": 50}' \
--topic-id "high/throughput/#"Managing Connectors
1. List All Connectors
# List all connectors
robust-ctl mqtt connector list
# List specific connector
robust-ctl mqtt connector list --connector-name "mongodb_connector_01"2. Delete Connector
# Delete specific connector
robust-ctl mqtt connector delete --connector-name "mongodb_connector_01"Complete Operation Example
Scenario: Creating IoT Data Storage System
# 1. Create sensor data MongoDB connector
robust-ctl mqtt connector create \
--connector-name "iot_sensor_mongodb" \
--connector-type "MongoDB" \
--config '{"host": "mongodb.iot.local", "port": 27017, "database": "iot_platform", "collection": "sensor_readings", "username": "iot_writer", "password": "iot_pass_2023", "auth_source": "admin", "max_pool_size": 100}' \
--topic-id "iot/sensors/+/readings"
# 2. Create device status MongoDB connector
robust-ctl mqtt connector create \
--connector-name "device_status_mongodb" \
--connector-type "MongoDB" \
--config '{"host": "mongodb.iot.local", "port": 27017, "database": "iot_platform", "collection": "device_status", "username": "iot_writer", "password": "iot_pass_2023", "auth_source": "admin"}' \
--topic-id "iot/devices/+/status"
# 3. Create alarm message MongoDB connector
robust-ctl mqtt connector create \
--connector-name "alarm_mongodb" \
--connector-type "MongoDB" \
--config '{"host": "mongodb.iot.local", "port": 27017, "database": "iot_platform", "collection": "alarm_logs", "username": "iot_writer", "password": "iot_pass_2023", "auth_source": "admin"}' \
--topic-id "iot/alarms/#"
# 4. View created connectors
robust-ctl mqtt connector list
# 5. Test connector (publish test message)
robust-ctl mqtt publish \
--username "test_user" \
--password "test_pass" \
--topic "iot/sensors/temp_001/readings" \
--qos 1 \
--message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}'MongoDB Deployment Examples
Docker Single Instance Deployment
# Start MongoDB service
docker run -d \
--name mongodb \
-p 27017:27017 \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=admin123 \
-v mongodb_data:/data/db \
mongo:7.0
# Wait for service to start
sleep 5
# Create database and user
docker exec -it mongodb mongosh -u admin -p admin123 --authenticationDatabase admin --eval "
use mqtt_data;
db.createUser({
user: 'mqtt_user',
pwd: 'mqtt_pass',
roles: [{ role: 'readWrite', db: 'mqtt_data' }]
});
"
# Create collection and indexes
docker exec -it mongodb mongosh -u mqtt_user -p mqtt_pass --authenticationDatabase mqtt_data --eval "
use mqtt_data;
db.createCollection('mqtt_messages');
db.mqtt_messages.createIndex({ 'timestamp': -1 });
db.mqtt_messages.createIndex({ 'key': 1 });
db.mqtt_messages.createIndex({ 'tags': 1 });
"Docker Compose Deployment
version: '3.8'
services:
mongodb:
image: mongo:7.0
container_name: mongodb
ports:
- "27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: admin123
MONGO_INITDB_DATABASE: mqtt_data
volumes:
- mongodb_data:/data/db
- ./mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
command: mongod --replSet rs0
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet
interval: 30s
timeout: 10s
retries: 5
robustmq:
image: robustmq/robustmq:latest
container_name: robustmq
ports:
- "1883:1883"
- "8883:8883"
depends_on:
- mongodb
environment:
- MONGODB_HOST=mongodb
- MONGODB_PORT=27017
volumes:
mongodb_data:mongo-init.js:
// Create database and user
db = db.getSiblingDB('mqtt_data');
db.createUser({
user: 'mqtt_user',
pwd: 'mqtt_pass',
roles: [
{ role: 'readWrite', db: 'mqtt_data' }
]
});
// Create collection
db.createCollection('mqtt_messages');
// Create indexes
db.mqtt_messages.createIndex({ 'timestamp': -1 });
db.mqtt_messages.createIndex({ 'key': 1 });
db.mqtt_messages.createIndex({ 'key': 1, 'timestamp': -1 });
db.mqtt_messages.createIndex({ 'tags': 1 });
// Create TTL index (auto-delete after 30 days)
db.mqtt_messages.createIndex(
{ 'timestamp': 1 },
{ expireAfterSeconds: 2592000 }
);MongoDB Replica Set Deployment
# Start replica set
docker-compose -f docker-compose-replica.yml up -d
# Initialize replica set
docker exec -it mongodb-0 mongosh --eval "
rs.initiate({
_id: 'rs0',
members: [
{ _id: 0, host: 'mongodb-0:27017' },
{ _id: 1, host: 'mongodb-1:27017' },
{ _id: 2, host: 'mongodb-2:27017' }
]
})
"docker-compose-replica.yml:
version: '3.8'
services:
mongodb-0:
image: mongo:7.0
command: mongod --replSet rs0
ports:
- "27017:27017"
volumes:
- mongodb0_data:/data/db
mongodb-1:
image: mongo:7.0
command: mongod --replSet rs0
ports:
- "27018:27017"
volumes:
- mongodb1_data:/data/db
mongodb-2:
image: mongo:7.0
command: mongod --replSet rs0
ports:
- "27019:27017"
volumes:
- mongodb2_data:/data/db
volumes:
mongodb0_data:
mongodb1_data:
mongodb2_data:Query Examples
Basic Queries
// Query messages from the last 1 hour
db.mqtt_messages.find({
timestamp: { $gt: Math.floor(Date.now() / 1000) - 3600 }
}).sort({ timestamp: -1 })
// Query messages from specific client
db.mqtt_messages.find({
key: "sensor_001"
}).sort({ timestamp: -1 }).limit(100)
// Query messages with specific tag
db.mqtt_messages.find({
tags: "temperature"
})
// Query messages within time range
db.mqtt_messages.find({
timestamp: {
$gte: 1640995200,
$lt: 1641081600
}
})Aggregation Queries
// Count messages per hour
db.mqtt_messages.aggregate([
{
$group: {
_id: {
$subtract: [
"$timestamp",
{ $mod: ["$timestamp", 3600] }
]
},
count: { $sum: 1 }
}
},
{ $sort: { _id: -1 } }
])
// Count messages by tag
db.mqtt_messages.aggregate([
{ $unwind: "$tags" },
{
$group: {
_id: "$tags",
count: { $sum: 1 }
}
},
{ $sort: { count: -1 } }
])
// Count messages by client
db.mqtt_messages.aggregate([
{
$group: {
_id: "$key",
count: { $sum: 1 },
first_message: { $min: "$timestamp" },
last_message: { $max: "$timestamp" }
}
},
{ $sort: { count: -1 } }
])
// Count messages by day
db.mqtt_messages.aggregate([
{
$group: {
_id: {
$dateToString: {
format: "%Y-%m-%d",
date: { $toDate: { $multiply: ["$timestamp", 1000] } }
}
},
count: { $sum: 1 }
}
},
{ $sort: { _id: -1 } }
])Advanced Queries
// Query messages with specific header field
db.mqtt_messages.find({
"header.name": "topic",
"header.value": { $regex: "^sensor/" }
})
// Use text search (requires text index)
db.mqtt_messages.createIndex({ "$**": "text" })
db.mqtt_messages.find({ $text: { $search: "temperature" } })
// Geospatial query (if data contains coordinates)
db.mqtt_messages.find({
location: {
$near: {
$geometry: { type: "Point", coordinates: [120.0, 30.0] },
$maxDistance: 1000
}
}
})Performance Optimization
Database Optimization
1. Index Optimization
// View current indexes
db.mqtt_messages.getIndexes()
// Analyze query performance
db.mqtt_messages.find({ key: "sensor_001" }).explain("executionStats")
// Drop unused indexes
db.mqtt_messages.dropIndex("index_name")
// Create covering index
db.mqtt_messages.createIndex({ key: 1, timestamp: -1, tags: 1 })2. Sharding Configuration
// Enable sharding
sh.enableSharding("mqtt_data")
// Shard by timestamp
sh.shardCollection(
"mqtt_data.mqtt_messages",
{ timestamp: 1 }
)
// Shard by client ID (hashed sharding)
sh.shardCollection(
"mqtt_data.mqtt_messages",
{ key: "hashed" }
)3. Data Retention Policy
// Create TTL index (auto-delete data older than 30 days)
db.mqtt_messages.createIndex(
{ timestamp: 1 },
{ expireAfterSeconds: 2592000 }
)
// Manually archive old data
db.mqtt_messages.aggregate([
{ $match: { timestamp: { $lt: 1640995200 } } },
{ $out: "mqtt_messages_archive" }
])
// Delete archived data
db.mqtt_messages.deleteMany({ timestamp: { $lt: 1640995200 } })Connector Optimization
1. Connection Pool Configuration
{
"max_pool_size": 100,
"min_pool_size": 10
}Configuration Recommendations:
- Low concurrency: max_pool_size=20, min_pool_size=5
- Medium concurrency: max_pool_size=50, min_pool_size=10
- High concurrency: max_pool_size=100+, min_pool_size=20
2. Batch Writes
The MongoDB connector automatically uses batch inserts (insert_many) to improve write performance.
Monitoring and Troubleshooting
Log Monitoring
The connector outputs detailed operation logs:
INFO Successfully connected to MongoDB at localhost:27017
INFO Successfully inserted 100 documents into MongoDB collection 'mqtt_messages'
ERROR Failed to connect to MongoDB at localhost:27017: connection timeout
ERROR Failed to insert documents into MongoDB collection 'mqtt_messages': write concern errorMongoDB Monitoring Commands
// View database status
db.stats()
// View collection status
db.mqtt_messages.stats()
// View current operations
db.currentOp()
// View slow queries
db.system.profile.find().sort({ ts: -1 }).limit(10)
// Enable slow query profiling
db.setProfilingLevel(1, { slowms: 100 })Common Issues
1. Connection Failure
# Check MongoDB service status
docker exec mongodb mongosh --eval "db.adminCommand('ping')"
# Check network connection
telnet localhost 27017
# View MongoDB logs
docker logs mongodb2. Authentication Failure
// Verify user credentials
db.auth("mqtt_user", "mqtt_pass")
// View user information
db.getUsers()
// Change user password
db.changeUserPassword("mqtt_user", "new_password")3. Low Write Performance
// View write latency
db.serverStatus().opcounters
// Check index efficiency
db.mqtt_messages.find({ key: "sensor_001" }).explain("executionStats")
// Optimize write concern level
db.mqtt_messages.insert(
{ ... },
{ writeConcern: { w: 1 } }
)4. Disk Space Issues
// View database size
db.stats()
// Compact collection
db.runCommand({ compact: "mqtt_messages" })
// Delete old data
db.mqtt_messages.deleteMany({
timestamp: { $lt: Math.floor(Date.now() / 1000) - 2592000 }
})Summary
The MongoDB connector is an important component of the RobustMQ data integration system, providing flexible and efficient document storage capabilities. With proper configuration and usage, it can meet various business requirements such as IoT data storage, historical data analysis, unstructured data storage, and real-time data processing.
This connector fully leverages MongoDB's document model, dynamic schema, and horizontal scaling capabilities, combined with Rust's memory safety and zero-cost abstraction advantages, to achieve high-performance and highly reliable data storage. It supports single instance, replica set, and sharded cluster deployment modes, making it an essential tool for building modern IoT data platforms and big data analytics systems.
Key Features
✅ Flexible Data Model: Document structure, no predefined schema required ✅ Horizontal Scaling: Supports sharded clusters for massive data ✅ High Availability: Replica set mode provides automatic failover ✅ Rich Query Capabilities: Supports complex queries, aggregations, and geospatial queries ✅ Batch Write Optimization: Automatic batch inserts improve write performance ✅ Data Lifecycle Management: TTL indexes automatically clean expired data ✅ Connection Pool Management: Smart connection pooling enhances concurrency performance
