Skip to content

MongoDB Connector

Overview

The MongoDB connector is a data integration component provided by RobustMQ for bridging MQTT messages to MongoDB NoSQL database systems. This connector supports document storage, flexible data models, and horizontal scaling, suitable for IoT data storage, historical data analysis, unstructured data storage, and real-time data processing scenarios.

Configuration

Connector Configuration

The MongoDB connector uses the MongoDBConnectorConfig structure for configuration:

rust
pub struct MongoDBConnectorConfig {
    pub host: String,                        // MongoDB server address
    pub port: u16,                           // MongoDB server port
    pub database: String,                    // Database name
    pub collection: String,                  // Collection name
    pub username: Option<String>,            // Username (optional)
    pub password: Option<String>,            // Password (optional)
    pub auth_source: Option<String>,         // Authentication database (optional)
    pub deployment_mode: MongoDBDeploymentMode, // Deployment mode
    pub replica_set_name: Option<String>,    // Replica set name (optional)
    pub enable_tls: bool,                    // Enable TLS/SSL
    pub max_pool_size: Option<u32>,          // Maximum pool size
    pub min_pool_size: Option<u32>,          // Minimum pool size
}

Deployment Modes

The MongoDB connector supports three deployment modes:

ModeDescriptionUse Case
singleSingle instanceDevelopment/test environments
replicasetReplica setProduction (high availability)
shardedSharded clusterLarge-scale data storage

Configuration Parameters

ParameterTypeRequiredDescriptionExample
hostStringYesMongoDB server addresslocalhost or mongodb.example.com
portu16NoMongoDB server port, default 2701727017
databaseStringYesDatabase namemqtt_data
collectionStringYesCollection namemqtt_messages
usernameStringNoUsernamemqtt_user
passwordStringNoPasswordmqtt_pass
auth_sourceStringNoAuthentication database, default adminadmin
deployment_modeStringNoDeployment mode, default singlesingle/replicaset/sharded
replica_set_nameStringNoReplica set name (required for replicaset mode)rs0
enable_tlsboolNoEnable TLS/SSL, default falsetrue or false
max_pool_sizeu32NoMaximum pool size100
min_pool_sizeu32NoMinimum pool size10

Configuration Examples

JSON Configuration Format

Basic Configuration (Single Mode):

json
{
  "host": "localhost",
  "port": 27017,
  "database": "mqtt_data",
  "collection": "mqtt_messages",
  "username": "mqtt_user",
  "password": "mqtt_pass",
  "auth_source": "admin",
  "deployment_mode": "single",
  "enable_tls": false
}

Replica Set Configuration:

json
{
  "host": "mongodb-0.mongodb.svc.cluster.local",
  "port": 27017,
  "database": "iot_data",
  "collection": "sensor_readings",
  "username": "iot_user",
  "password": "iot_pass",
  "auth_source": "admin",
  "deployment_mode": "replicaset",
  "replica_set_name": "rs0",
  "enable_tls": true,
  "max_pool_size": 100,
  "min_pool_size": 10
}

No Authentication Configuration (Development):

json
{
  "host": "localhost",
  "port": 27017,
  "database": "test_db",
  "collection": "test_messages",
  "deployment_mode": "single",
  "enable_tls": false
}

Complete Connector Configuration

json
{
  "cluster_name": "default",
  "connector_name": "mongodb_connector_01",
  "connector_type": "MongoDB",
  "config": "{\"host\": \"localhost\", \"port\": 27017, \"database\": \"mqtt_data\", \"collection\": \"mqtt_messages\", \"username\": \"mqtt_user\", \"password\": \"mqtt_pass\", \"deployment_mode\": \"single\"}",
  "topic_name": "sensor/data",
  "status": "Idle",
  "broker_id": null,
  "create_time": 1640995200,
  "update_time": 1640995200
}

Data Model

Document Structure

The MongoDB connector converts MQTT messages to BSON documents for storage, preserving the complete message structure:

javascript
{
  "_id": ObjectId("507f1f77bcf86cd799439011"),  // Auto-generated by MongoDB
  "offset": 12345,                              // Message offset
  "header": [                                   // Message headers
    {
      "name": "topic",
      "value": "sensor/temperature"
    },
    {
      "name": "qos",
      "value": "1"
    }
  ],
  "key": "sensor_001",                          // Message key (client ID)
  "data": [123, 34, 116, 101, 109, 112, ...],   // Message data (byte array)
  "tags": ["sensor", "temperature"],            // Message tags
  "timestamp": 1640995200,                      // Message timestamp (seconds)
  "crc_num": 1234567890                         // CRC checksum
}

Field Description

FieldTypeDescriptionIndex Recommendation
_idObjectIdMongoDB document unique identifierAuto-indexed
offsetNumberMessage offsetRecommended
headerArrayMessage header information array-
keyStringMessage key (usually client ID)Recommended
dataBinaryMessage data (byte array)-
tagsArrayMessage tags arrayRecommended
timestampNumberMessage timestamp (seconds)Recommended
crc_numNumberCRC checksum-

Index Recommendations

To improve query performance, it's recommended to create the following indexes:

javascript
// 1. Timestamp query index
db.mqtt_messages.createIndex({ "timestamp": -1 })

// 2. Client ID query index
db.mqtt_messages.createIndex({ "key": 1 })

// 3. Tags query index
db.mqtt_messages.createIndex({ "tags": 1 })

// 4. Compound index (client + timestamp)
db.mqtt_messages.createIndex({ "key": 1, "timestamp": -1 })

// 5. Offset query index
db.mqtt_messages.createIndex({ "offset": 1 })

// 6. TTL index (auto-delete expired data)
db.mqtt_messages.createIndex(
  { "timestamp": 1 },
  { expireAfterSeconds: 2592000 }  // Auto-delete after 30 days
)

Using robust-ctl to Create MongoDB Connector

Basic Syntax

Use the robust-ctl command-line tool to create and manage MongoDB connectors:

bash
robust-ctl mqtt connector create \
  --connector-name <connector-name> \
  --connector-type <connector-type> \
  --config <configuration> \
  --topic-id <topic-id>

Creating MongoDB Connector

1. Basic Creation Command

bash
# Create MongoDB connector
robust-ctl mqtt connector create \
  --connector-name "mongodb_connector_01" \
  --connector-type "MongoDB" \
  --config '{"host": "localhost", "port": 27017, "database": "mqtt_data", "collection": "mqtt_messages", "username": "mqtt_user", "password": "mqtt_pass", "deployment_mode": "single"}' \
  --topic-id "sensor/data"

2. Parameter Description

ParameterDescriptionExample
--connector-nameConnector name, must be uniquemongodb_connector_01
--connector-typeConnector type, must be MongoDBMongoDB
--configJSON format configuration{"host": "localhost", ...}
--topic-idMQTT topic to monitorsensor/data

3. Advanced Configuration Examples

Replica Set Configuration:

bash
robust-ctl mqtt connector create \
  --connector-name "mongodb_replicaset" \
  --connector-type "MongoDB" \
  --config '{"host": "mongodb-0.example.com", "port": 27017, "database": "iot_data", "collection": "sensor_data", "username": "iot_admin", "password": "secure_pass", "auth_source": "admin", "deployment_mode": "replicaset", "replica_set_name": "rs0", "enable_tls": true, "max_pool_size": 100}' \
  --topic-id "iot/sensors/+/data"

High Performance Configuration:

bash
robust-ctl mqtt connector create \
  --connector-name "mongodb_high_perf" \
  --connector-type "MongoDB" \
  --config '{"host": "mongodb.local", "port": 27017, "database": "high_throughput", "collection": "messages", "username": "perf_user", "password": "perf_pass", "max_pool_size": 200, "min_pool_size": 50}' \
  --topic-id "high/throughput/#"

Managing Connectors

1. List All Connectors

bash
# List all connectors
robust-ctl mqtt connector list

# List specific connector
robust-ctl mqtt connector list --connector-name "mongodb_connector_01"

2. Delete Connector

bash
# Delete specific connector
robust-ctl mqtt connector delete --connector-name "mongodb_connector_01"

Complete Operation Example

Scenario: Creating IoT Data Storage System

bash
# 1. Create sensor data MongoDB connector
robust-ctl mqtt connector create \
  --connector-name "iot_sensor_mongodb" \
  --connector-type "MongoDB" \
  --config '{"host": "mongodb.iot.local", "port": 27017, "database": "iot_platform", "collection": "sensor_readings", "username": "iot_writer", "password": "iot_pass_2023", "auth_source": "admin", "max_pool_size": 100}' \
  --topic-id "iot/sensors/+/readings"

# 2. Create device status MongoDB connector
robust-ctl mqtt connector create \
  --connector-name "device_status_mongodb" \
  --connector-type "MongoDB" \
  --config '{"host": "mongodb.iot.local", "port": 27017, "database": "iot_platform", "collection": "device_status", "username": "iot_writer", "password": "iot_pass_2023", "auth_source": "admin"}' \
  --topic-id "iot/devices/+/status"

# 3. Create alarm message MongoDB connector
robust-ctl mqtt connector create \
  --connector-name "alarm_mongodb" \
  --connector-type "MongoDB" \
  --config '{"host": "mongodb.iot.local", "port": 27017, "database": "iot_platform", "collection": "alarm_logs", "username": "iot_writer", "password": "iot_pass_2023", "auth_source": "admin"}' \
  --topic-id "iot/alarms/#"

# 4. View created connectors
robust-ctl mqtt connector list

# 5. Test connector (publish test message)
robust-ctl mqtt publish \
  --username "test_user" \
  --password "test_pass" \
  --topic "iot/sensors/temp_001/readings" \
  --qos 1 \
  --message '{"temperature": 25.5, "humidity": 60, "timestamp": 1640995200}'

MongoDB Deployment Examples

Docker Single Instance Deployment

bash
# Start MongoDB service
docker run -d \
  --name mongodb \
  -p 27017:27017 \
  -e MONGO_INITDB_ROOT_USERNAME=admin \
  -e MONGO_INITDB_ROOT_PASSWORD=admin123 \
  -v mongodb_data:/data/db \
  mongo:7.0

# Wait for service to start
sleep 5

# Create database and user
docker exec -it mongodb mongosh -u admin -p admin123 --authenticationDatabase admin --eval "
  use mqtt_data;
  db.createUser({
    user: 'mqtt_user',
    pwd: 'mqtt_pass',
    roles: [{ role: 'readWrite', db: 'mqtt_data' }]
  });
"

# Create collection and indexes
docker exec -it mongodb mongosh -u mqtt_user -p mqtt_pass --authenticationDatabase mqtt_data --eval "
  use mqtt_data;
  db.createCollection('mqtt_messages');
  db.mqtt_messages.createIndex({ 'timestamp': -1 });
  db.mqtt_messages.createIndex({ 'key': 1 });
  db.mqtt_messages.createIndex({ 'tags': 1 });
"

Docker Compose Deployment

yaml
version: '3.8'

services:
  mongodb:
    image: mongo:7.0
    container_name: mongodb
    ports:
      - "27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: admin123
      MONGO_INITDB_DATABASE: mqtt_data
    volumes:
      - mongodb_data:/data/db
      - ./mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
    command: mongod --replSet rs0
    healthcheck:
      test: echo 'db.runCommand("ping").ok' | mongosh localhost:27017/test --quiet
      interval: 30s
      timeout: 10s
      retries: 5

  robustmq:
    image: robustmq/robustmq:latest
    container_name: robustmq
    ports:
      - "1883:1883"
      - "8883:8883"
    depends_on:
      - mongodb
    environment:
      - MONGODB_HOST=mongodb
      - MONGODB_PORT=27017

volumes:
  mongodb_data:

mongo-init.js:

javascript
// Create database and user
db = db.getSiblingDB('mqtt_data');

db.createUser({
  user: 'mqtt_user',
  pwd: 'mqtt_pass',
  roles: [
    { role: 'readWrite', db: 'mqtt_data' }
  ]
});

// Create collection
db.createCollection('mqtt_messages');

// Create indexes
db.mqtt_messages.createIndex({ 'timestamp': -1 });
db.mqtt_messages.createIndex({ 'key': 1 });
db.mqtt_messages.createIndex({ 'key': 1, 'timestamp': -1 });
db.mqtt_messages.createIndex({ 'tags': 1 });

// Create TTL index (auto-delete after 30 days)
db.mqtt_messages.createIndex(
  { 'timestamp': 1 },
  { expireAfterSeconds: 2592000 }
);

MongoDB Replica Set Deployment

bash
# Start replica set
docker-compose -f docker-compose-replica.yml up -d

# Initialize replica set
docker exec -it mongodb-0 mongosh --eval "
  rs.initiate({
    _id: 'rs0',
    members: [
      { _id: 0, host: 'mongodb-0:27017' },
      { _id: 1, host: 'mongodb-1:27017' },
      { _id: 2, host: 'mongodb-2:27017' }
    ]
  })
"

docker-compose-replica.yml:

yaml
version: '3.8'

services:
  mongodb-0:
    image: mongo:7.0
    command: mongod --replSet rs0
    ports:
      - "27017:27017"
    volumes:
      - mongodb0_data:/data/db

  mongodb-1:
    image: mongo:7.0
    command: mongod --replSet rs0
    ports:
      - "27018:27017"
    volumes:
      - mongodb1_data:/data/db

  mongodb-2:
    image: mongo:7.0
    command: mongod --replSet rs0
    ports:
      - "27019:27017"
    volumes:
      - mongodb2_data:/data/db

volumes:
  mongodb0_data:
  mongodb1_data:
  mongodb2_data:

Query Examples

Basic Queries

javascript
// Query messages from the last 1 hour
db.mqtt_messages.find({
  timestamp: { $gt: Math.floor(Date.now() / 1000) - 3600 }
}).sort({ timestamp: -1 })

// Query messages from specific client
db.mqtt_messages.find({
  key: "sensor_001"
}).sort({ timestamp: -1 }).limit(100)

// Query messages with specific tag
db.mqtt_messages.find({
  tags: "temperature"
})

// Query messages within time range
db.mqtt_messages.find({
  timestamp: {
    $gte: 1640995200,
    $lt: 1641081600
  }
})

Aggregation Queries

javascript
// Count messages per hour
db.mqtt_messages.aggregate([
  {
    $group: {
      _id: {
        $subtract: [
          "$timestamp",
          { $mod: ["$timestamp", 3600] }
        ]
      },
      count: { $sum: 1 }
    }
  },
  { $sort: { _id: -1 } }
])

// Count messages by tag
db.mqtt_messages.aggregate([
  { $unwind: "$tags" },
  {
    $group: {
      _id: "$tags",
      count: { $sum: 1 }
    }
  },
  { $sort: { count: -1 } }
])

// Count messages by client
db.mqtt_messages.aggregate([
  {
    $group: {
      _id: "$key",
      count: { $sum: 1 },
      first_message: { $min: "$timestamp" },
      last_message: { $max: "$timestamp" }
    }
  },
  { $sort: { count: -1 } }
])

// Count messages by day
db.mqtt_messages.aggregate([
  {
    $group: {
      _id: {
        $dateToString: {
          format: "%Y-%m-%d",
          date: { $toDate: { $multiply: ["$timestamp", 1000] } }
        }
      },
      count: { $sum: 1 }
    }
  },
  { $sort: { _id: -1 } }
])

Advanced Queries

javascript
// Query messages with specific header field
db.mqtt_messages.find({
  "header.name": "topic",
  "header.value": { $regex: "^sensor/" }
})

// Use text search (requires text index)
db.mqtt_messages.createIndex({ "$**": "text" })
db.mqtt_messages.find({ $text: { $search: "temperature" } })

// Geospatial query (if data contains coordinates)
db.mqtt_messages.find({
  location: {
    $near: {
      $geometry: { type: "Point", coordinates: [120.0, 30.0] },
      $maxDistance: 1000
    }
  }
})

Performance Optimization

Database Optimization

1. Index Optimization

javascript
// View current indexes
db.mqtt_messages.getIndexes()

// Analyze query performance
db.mqtt_messages.find({ key: "sensor_001" }).explain("executionStats")

// Drop unused indexes
db.mqtt_messages.dropIndex("index_name")

// Create covering index
db.mqtt_messages.createIndex({ key: 1, timestamp: -1, tags: 1 })

2. Sharding Configuration

javascript
// Enable sharding
sh.enableSharding("mqtt_data")

// Shard by timestamp
sh.shardCollection(
  "mqtt_data.mqtt_messages",
  { timestamp: 1 }
)

// Shard by client ID (hashed sharding)
sh.shardCollection(
  "mqtt_data.mqtt_messages",
  { key: "hashed" }
)

3. Data Retention Policy

javascript
// Create TTL index (auto-delete data older than 30 days)
db.mqtt_messages.createIndex(
  { timestamp: 1 },
  { expireAfterSeconds: 2592000 }
)

// Manually archive old data
db.mqtt_messages.aggregate([
  { $match: { timestamp: { $lt: 1640995200 } } },
  { $out: "mqtt_messages_archive" }
])

// Delete archived data
db.mqtt_messages.deleteMany({ timestamp: { $lt: 1640995200 } })

Connector Optimization

1. Connection Pool Configuration

json
{
  "max_pool_size": 100,
  "min_pool_size": 10
}

Configuration Recommendations:

  • Low concurrency: max_pool_size=20, min_pool_size=5
  • Medium concurrency: max_pool_size=50, min_pool_size=10
  • High concurrency: max_pool_size=100+, min_pool_size=20

2. Batch Writes

The MongoDB connector automatically uses batch inserts (insert_many) to improve write performance.

Monitoring and Troubleshooting

Log Monitoring

The connector outputs detailed operation logs:

INFO  Successfully connected to MongoDB at localhost:27017
INFO  Successfully inserted 100 documents into MongoDB collection 'mqtt_messages'
ERROR Failed to connect to MongoDB at localhost:27017: connection timeout
ERROR Failed to insert documents into MongoDB collection 'mqtt_messages': write concern error

MongoDB Monitoring Commands

javascript
// View database status
db.stats()

// View collection status
db.mqtt_messages.stats()

// View current operations
db.currentOp()

// View slow queries
db.system.profile.find().sort({ ts: -1 }).limit(10)

// Enable slow query profiling
db.setProfilingLevel(1, { slowms: 100 })

Common Issues

1. Connection Failure

bash
# Check MongoDB service status
docker exec mongodb mongosh --eval "db.adminCommand('ping')"

# Check network connection
telnet localhost 27017

# View MongoDB logs
docker logs mongodb

2. Authentication Failure

javascript
// Verify user credentials
db.auth("mqtt_user", "mqtt_pass")

// View user information
db.getUsers()

// Change user password
db.changeUserPassword("mqtt_user", "new_password")

3. Low Write Performance

javascript
// View write latency
db.serverStatus().opcounters

// Check index efficiency
db.mqtt_messages.find({ key: "sensor_001" }).explain("executionStats")

// Optimize write concern level
db.mqtt_messages.insert(
  { ... },
  { writeConcern: { w: 1 } }
)

4. Disk Space Issues

javascript
// View database size
db.stats()

// Compact collection
db.runCommand({ compact: "mqtt_messages" })

// Delete old data
db.mqtt_messages.deleteMany({
  timestamp: { $lt: Math.floor(Date.now() / 1000) - 2592000 }
})

Summary

The MongoDB connector is an important component of the RobustMQ data integration system, providing flexible and efficient document storage capabilities. With proper configuration and usage, it can meet various business requirements such as IoT data storage, historical data analysis, unstructured data storage, and real-time data processing.

This connector fully leverages MongoDB's document model, dynamic schema, and horizontal scaling capabilities, combined with Rust's memory safety and zero-cost abstraction advantages, to achieve high-performance and highly reliable data storage. It supports single instance, replica set, and sharded cluster deployment modes, making it an essential tool for building modern IoT data platforms and big data analytics systems.

Key Features

Flexible Data Model: Document structure, no predefined schema required ✅ Horizontal Scaling: Supports sharded clusters for massive data ✅ High Availability: Replica set mode provides automatic failover ✅ Rich Query Capabilities: Supports complex queries, aggregations, and geospatial queries ✅ Batch Write Optimization: Automatic batch inserts improve write performance ✅ Data Lifecycle Management: TTL indexes automatically clean expired data ✅ Connection Pool Management: Smart connection pooling enhances concurrency performance