Skip to content

RobustMQ Kafka 协议支持

本文档列出 RobustMQ 作为 Kafka Broker 需要支持的 Kafka 协议 API,以及各 API 的优先级和说明。

参考文档:


一、核心数据面(必须支持)

API KeyAPI 名称说明已支持
0Produce生产者写消息
1Fetch消费者拉消息
2ListOffsets查询 topic/partition 的 offset(earliest / latest / by timestamp)
3Metadata客户端启动时获取集群拓扑、topic / partition / broker 信息

二、Consumer Group 管理(必须支持)

API KeyAPI 名称说明已支持
8OffsetCommit提交消费 offset
9OffsetFetch查询已提交 offset
10FindCoordinator找 Group / Transaction Coordinator 所在 broker
11JoinGroup加入消费组,触发 rebalance
12Heartbeat消费者心跳,维持 group 成员资格
13LeaveGroup主动离开消费组
14SyncGrouprebalance 后同步分区分配结果
15DescribeGroups查询 consumer group 状态
16ListGroups列出所有 consumer group
42DeleteGroups删除 consumer group
47OffsetDelete删除已提交的 offset

三、连接与认证(必须支持)

API KeyAPI 名称说明已支持
17SaslHandshakeSASL 认证握手,选择认证机制
18ApiVersions客户端连接后第一个请求,协商支持的 API 版本
36SaslAuthenticateSASL token 交换(SaslHandshake v1+ 使用)

四、Topic / Partition 管理(必须支持)

API KeyAPI 名称说明已支持
19CreateTopics创建 topic
20DeleteTopics删除 topic
21DeleteRecords删除 partition 中指定 offset 之前的消息
37CreatePartitions增加 partition 数量

五、配置管理(必须支持)

API KeyAPI 名称说明已支持
32DescribeConfigs查询 topic / broker 配置
33AlterConfigs修改配置
44IncrementalAlterConfigs增量修改配置(推荐替代 AlterConfigs)

六、事务支持(Exactly-Once,按需支持)

API KeyAPI 名称说明已支持
22InitProducerId获取 producer id(幂等生产 / 事务必须)
24AddPartitionsToTxn将 partition 加入事务
25AddOffsetsToTxn将 consumer offset 加入事务
26EndTxn提交或回滚事务
28TxnOffsetCommit事务内提交 offset
65DescribeTransactions查询事务状态
66ListTransactions列出所有事务

七、ACL 权限控制(可选)

API KeyAPI 名称说明已支持
29DescribeAcls查询 ACL 规则
30CreateAcls创建 ACL 规则
31DeleteAcls删除 ACL 规则

八、Quota 配额管理(可选)

API KeyAPI 名称说明已支持
48DescribeClientQuotas查询客户端配额
49AlterClientQuotas修改客户端配额
50DescribeUserScramCredentials查询 SCRAM 认证信息
51AlterUserScramCredentials修改 SCRAM 认证信息

九、Delegation Token 认证(可选)

API KeyAPI 名称说明已支持
38CreateDelegationToken创建委托令牌
39RenewDelegationToken续期委托令牌
40ExpireDelegationToken使委托令牌过期
41DescribeDelegationToken查询委托令牌信息

十、客户端遥测(可选,KIP-714)

API KeyAPI 名称说明已支持
71GetTelemetrySubscriptions获取客户端遥测订阅配置
72PushTelemetry客户端上报遥测指标
74ListConfigResources列出配置资源

十一、运维管理(可选)

API KeyAPI 名称说明已支持
23OffsetForLeaderEpochfollower / consumer 用于 leader epoch 校验
34AlterReplicaLogDirs迁移日志目录
35DescribeLogDirs查询 log 存储目录信息
43ElectLeaders手动触发 leader 选举
45AlterPartitionReassignments重新分配 partition 副本
46ListPartitionReassignments查询 partition 重分配进度
57UpdateFeatures更新 broker feature flag
60DescribeCluster查询集群信息
61DescribeProducers查询活跃 producer 信息
75DescribeTopicPartitions查询 topic partition 详情

十二、新一代 Consumer Group 协议(可选,KIP-848)

API KeyAPI 名称说明已支持
68ConsumerGroupHeartbeat新 consumer group 协议心跳
69ConsumerGroupDescribe新 consumer group 查询

十三、Share Group(可选,KIP-932,Kafka 4.0+)

Share Group 是 Kafka 4.0 引入的新消费模型,支持消息的共享消费(非独占分区):

API KeyAPI 名称说明已支持
76ShareGroupHeartbeatShare Group 心跳
77ShareGroupDescribe查询 Share Group 状态
78ShareFetchShare Group 拉取消息
79ShareAcknowledgeShare Group 确认消息
90DescribeShareGroupOffsets查询 Share Group offset
91AlterShareGroupOffsets修改 Share Group offset
92DeleteShareGroupOffsets删除 Share Group offset

十四、不需要支持(KRaft / 集群内部通信)

以下 API 仅用于 Kafka 集群内部(副本管理、KRaft 选举、Controller 通信),RobustMQ 不需要实现:

API KeyAPI 名称说明
4LeaderAndIsrbroker 间副本管理
5StopReplicabroker 间停止副本
6UpdateMetadataController 广播元数据
7ControlledShutdownbroker 有序下线
27WriteTxnMarkersTransaction Coordinator 写入 broker,内部
52VoteKRaft 投票
53BeginQuorumEpochKRaft
54EndQuorumEpochKRaft
55DescribeQuorumKRaft quorum 状态
56AlterPartitionKRaft ISR 变更
58EnvelopeController 请求转发
59FetchSnapshotKRaft 日志快照
62BrokerRegistrationKRaft broker 注册
63BrokerHeartbeatKRaft broker 心跳
64UnregisterBrokerKRaft broker 注销
67AllocateProducerIdsKRaft Controller 分配 Producer ID
70ControllerRegistrationKRaft Controller 注册
73AssignReplicasToDirs内部副本目录分配
80AddRaftVoterKRaft
81RemoveRaftVoterKRaft
82UpdateRaftVoterKRaft
83InitializeShareGroupStateShare Group 状态内部存储
84ReadShareGroupStateShare Group 状态内部读取
85WriteShareGroupStateShare Group 状态内部写入
86DeleteShareGroupStateShare Group 状态内部删除
87ReadShareGroupStateSummaryShare Group 状态摘要内部读取

实现路线图

第一阶段:标准客户端可用

实现以下 API 后,标准 Kafka 生产者 / 消费者客户端可正常工作:

text
ApiVersions(18) → Metadata(3) → SaslHandshake(17) / SaslAuthenticate(36)
→ Produce(0) / Fetch(1) / ListOffsets(2)
→ FindCoordinator(10) → JoinGroup(11) / SyncGroup(14) / Heartbeat(12) / LeaveGroup(13)
→ OffsetCommit(8) / OffsetFetch(9)
→ CreateTopics(19) / DeleteTopics(20) → DescribeConfigs(32)

共约 20 个 API

第二阶段:事务支持

在第一阶段基础上,增加事务相关 API(22、24、25、26、28、65、66)。

第三阶段:管控完整性

补全 ACL、Quota、运维管理类 API。