Skip to content

Connector Architecture

Connectors stream messages from a Broker to external data systems in real time. They run on Broker nodes and are scheduled centrally by the Meta Service.


Overall Structure

The architecture uses Meta for scheduling and Broker for execution:

  • ConnectorScheduler in Meta Service handles Connector assignment, reclamation, and state management
  • Broker nodes execute the actual consume-and-write logic
  • State is synchronized between the two via gRPC (UpdateCache)

Connector Overview


Lifecycle

A Connector has four states, maintained centrally by Meta Service:

StateMeaning
IdleUnassigned; waiting for the scheduler to assign it to a Broker
RunningAssigned to a Broker; the Broker-side thread is active
StoppedManually stopped by the user; excluded from scheduling
ErrorFailed during execution; awaiting retry or manual intervention

State transitions: created as Idle → assigned by the scheduler becomes Running → heartbeat timeout or Broker down resets to Idle → re-enters scheduling.

Connector Lifecycle


Meta Scheduling (ConnectorScheduler)

Two tasks run every second:

Heartbeat check (check_heartbeat): Iterates over all Connectors and checks their last heartbeat time. If a Connector has timed out, it is marked Idle, its broker_id assignment is cleared, and it waits for rescheduling.

Assignment and reclamation (start_stop_connector_thread):

  • Collects all Connectors in Idle state
  • Calculates load per Broker (number of assigned Connectors)
  • Assigns using least-load selection
  • Updates status to Running and notifies the Broker via UpdateCache

Broker-Side Scheduling

The Broker's start_connector_thread runs two checks every second:

Start check (start_connectors): Iterates over all Connectors in the local cache. If a Connector is assigned to the current Broker and its thread is not running, starts the corresponding Sink thread by ConnectorType.

Reclaim check (gc_connectors): Iterates over all running threads. If the corresponding Connector is no longer assigned to the current Broker, sends a stop signal and updates its status to Idle.


Consume Loop (run_connector_loop)

Execution flow for each Connector thread:

  1. Check the stop signal; exit the loop if received
  2. Call read_by_offset on the Storage Adapter to fetch a batch of messages
  3. If no messages are available, sleep briefly and retry
  4. Call ConnectorSink::send_batch to write the messages to the external system
  5. On success: commit the Offset and update the heartbeat timestamp
  6. On failure: apply the failure_action policy — discard, retry, or route to the dead-letter queue

Connector Consume Loop


ConnectorSink Trait

All external system Connectors implement this interface:

MethodDescription
validate()Validate connection configuration
init_sink()Initialize external connection resources
send_batch()Send a batch of messages to the external system
cleanup_sink()Release connection resources

To add a new Connector type: implement ConnectorSink, add the type to the ConnectorType enum, and add dispatch logic in start_thread.


Failure Handling Policy

When send_batch fails, the configured policy determines behavior:

PolicyBehavior
DiscardDrop the batch and continue consuming the next one
DiscardAfterRetryRetry a configured number of times, then discard
DeadMessageQueueRetry a configured number of times, then send to dead-letter queue

Heartbeat Mechanism

Broker side: Updates the local heartbeat timestamp on each successful message read. A heartbeat reporting thread periodically batches and sends these updates to Meta Service.

Meta side: ConnectorScheduler periodically checks heartbeats. Connectors that have timed out are reset to Idle and await rescheduling to a healthy Broker.


Offset Management

Each Connector maintains its own consume progress using connector_name as the consumer group name:

  • Tracks the maximum offset per Shard on each read
  • Commits offset after send_batch succeeds (at-least-once semantics)
  • Does not commit offset on failure
  • After a Connector migrates to another Broker, consumption resumes from the last committed offset

Supported External Systems

TypeDescription
KafkaWrite to a Kafka Topic
ElasticsearchWrite to an ES Index
RedisExecute Redis command templates
MongoDBWrite to a MongoDB Collection
MySQLWrite to a MySQL table
PostgreSQLWrite to a PostgreSQL table
RabbitMQPublish to a RabbitMQ Exchange
PulsarPublish to a Pulsar Topic
GreptimeDBWrite to GreptimeDB
LocalFileWrite to a local file

Code Structure

src/connector/src/
├── traits.rs       ConnectorSink trait
├── loops.rs        Consume loop, offset management
├── core.rs         Broker-side scheduling, type dispatch
├── manager.rs      Runtime state management
├── heartbeat.rs    Heartbeat reporting thread
├── failure.rs      Failure handling policy
├── storage/        Meta Service storage interaction
├── kafka/
├── elasticsearch/
├── redis/
├── mongodb/
├── mysql/
├── postgres/
├── rabbitmq/
├── pulsar/
├── greptimedb/
└── file/
🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀