Skip to content

RobustMQ AMQP 协议支持

本文档列出 RobustMQ 作为 AMQP Broker 需要支持的 AMQP 0.9.1 协议命令,以及各命令的优先级和说明。

参考文档:


协议基础

AMQP 0.9.1 基于 TCP,采用帧(Frame)传输。每帧由类型、通道号、长度、payload 和帧结束符(0xCE)组成。

  • 连接相关 img

  • 生产消费相关

img

  • broker 内部逻辑 img

帧类型

帧类型编号说明
Method Frame1控制命令(所有 Class/Method)
Content Header Frame2消息属性(content-type、delivery-mode、headers 等)
Content Body Frame3消息体 payload(可分片)
Heartbeat Frame8心跳保活

消息发布和投递均由 Method + Content Header + Content Body 三帧组合完成。


一、Connection 类(必须支持)

连接级握手,全部在 channel=0 上进行。Broker 需要主动发起 startsecuretuneclose,并处理客户端的响应。

Class.Method编号方向说明已支持
connection.start10.10S→CBroker 发起握手,告知支持的 SASL 机制和 locale
connection.start-ok10.11C→S客户端选择 SASL 机制并发送认证响应
connection.secure10.20S→CBroker 发送 SASL challenge(多轮认证)
connection.secure-ok10.21C→S客户端响应 SASL challenge
connection.tune10.30S→CBroker 提议 channel-max、frame-max、heartbeat 参数
connection.tune-ok10.31C→S客户端确认连接参数
connection.open10.40C→S客户端打开 virtual host
connection.open-ok10.41S→CBroker 确认 vhost 连接成功
connection.close10.50双向任一方发起关闭连接(携带错误码)
connection.close-ok10.51双向确认关闭

二、Channel 类(必须支持)

一条 TCP 连接上可以复用多个 channel,每个 channel 独立进行消息收发。

Class.Method编号方向说明已支持
channel.open20.10C→S客户端开启一个 channel
channel.open-ok20.11S→CBroker 确认 channel 开启
channel.flow20.20双向暂停或恢复消息流(背压控制)
channel.flow-ok20.21双向确认 flow 命令
channel.close20.40双向关闭 channel(携带错误码)
channel.close-ok20.41双向确认关闭

三、Exchange 类(必须支持)

Exchange 是消息路由的核心,支持 direct、fanout、topic、headers 四种类型。

Class.Method编号方向说明已支持
exchange.declare40.10C→S创建或验证 exchange(type/passive/durable/no-wait)
exchange.declare-ok40.11S→C确认创建
exchange.delete40.20C→S删除 exchange(if-unused 选项)
exchange.delete-ok40.21S→C确认删除

四、Queue 类(必须支持)

Class.Method编号方向说明已支持
queue.declare50.10C→S创建或验证队列(passive/durable/exclusive/auto-delete)
queue.declare-ok50.11S→C确认创建,返回队列名、消息数、消费者数
queue.bind50.20C→S绑定队列到 exchange(指定 routing-key)
queue.bind-ok50.21S→C确认绑定
queue.unbind50.50C→S解除队列与 exchange 的绑定
queue.unbind-ok50.51S→C确认解绑
queue.purge50.30C→S清空队列中所有未 ack 的消息
queue.purge-ok50.31S→C确认清空,返回清除消息数
queue.delete50.40C→S删除队列(if-unused / if-empty 选项)
queue.delete-ok50.41S→C确认删除,返回删除消息数

五、Basic 类(必须支持)

Basic 类是 AMQP 0.9.1 的核心,包含消息发布、投递、确认的全部逻辑。

5.1 消费者管理

Class.Method编号方向说明已支持
basic.qos60.10C→S设置预取(prefetch-size、prefetch-count、global)
basic.qos-ok60.11S→C确认 QoS 设置
basic.consume60.20C→S注册消费者,开启 push 模式消费(no-local/no-ack/exclusive)
basic.consume-ok60.21S→C返回 consumer-tag
basic.cancel60.30C→S取消消费者
basic.cancel-ok60.31S→C确认取消

5.2 消息发布

Class.Method编号方向说明已支持
basic.publish60.40C→S发布消息(指定 exchange、routing-key、mandatory、immediate),后跟 Content Header + Body 帧
basic.return60.50S→C退回无法路由的消息(mandatory/immediate 标志触发)

5.3 消息投递

Class.Method编号方向说明已支持
basic.deliver60.60S→CBroker 推送消息给消费者(push 模式),后跟 Content Header + Body 帧
basic.get60.70C→S同步拉取一条消息(pull 模式)
basic.get-ok60.71S→C返回消息,后跟 Content Header + Body 帧
basic.get-empty60.72S→C队列为空时的响应

5.4 消息确认

Class.Method编号方向说明已支持
basic.ack60.80C→S确认消息已处理(支持 multiple 批量 ack)
basic.reject60.90C→S拒绝消息(requeue=true 重新入队,false 丢弃)
basic.recover60.110C→S要求 Broker 重新投递所有未 ack 的消息
basic.recover-ok60.111S→C确认 recover

六、Tx 类(可选,本地事务)

Class.Method编号方向说明已支持
tx.select90.10C→S开启事务模式
tx.select-ok90.11S→C确认事务模式开启
tx.commit90.20C→S提交事务(publish + ack 原子生效)
tx.commit-ok90.21S→C确认提交
tx.rollback90.30C→S回滚事务
tx.rollback-ok90.31S→C确认回滚

七、RabbitMQ 扩展(可选)

以下为 RabbitMQ 对 AMQP 0.9.1 的私有扩展,不在标准规范内,但主流客户端广泛使用:

扩展说明已支持
basic.nack批量拒绝消息(标准 reject 只能拒绝单条)
confirm.select / confirm.select-okPublisher Confirm 模式,Broker 对每条 publish 回 ack/nack
exchange.bind / exchange.bind-okExchange-to-Exchange 绑定
exchange.unbind / exchange.unbind-ok解除 Exchange-to-Exchange 绑定

Publisher Confirm 是生产环境几乎必用的可靠性机制,建议与 basic.publish 一同实现。


Broker 核心业务逻辑

AMQP 0.9.1 中约一半的 method 是 *-ok 的确认回包,Broker 直接构造返回即可。真正需要实现业务逻辑的是以下 5 个方面:

核心能力涉及 Method说明
认证connection.start / start-ok / secure / secure-okSASL 握手,支持 PLAIN、AMQPLAIN 机制
路由exchange.declare + queue.bind + basic.publishpublish → exchange → binding → queue 匹配,支持 direct/fanout/topic/headers
Push 投递basic.consume + basic.deliver维护消费者注册表,遵守 prefetch 窗口,push 消息给消费者
消息确认basic.ack / reject / nack / recover驱动消息状态变更(unacked → acked / requeued / dead-lettered)
事务tx.select / commit / rollbackpublish 和 ack 的原子性保证(可选)

实现路线图

第一阶段:标准客户端可用

实现以下 Method 后,标准 AMQP 客户端(pika、amqplib 等)可正常收发消息:

text
connection: start → start-ok → tune → tune-ok → open → open-ok
channel: open → open-ok
exchange: declare → declare-ok
queue: declare → declare-ok → bind → bind-ok
basic: publish(+Header+Body) → deliver(+Header+Body)
basic: consume → consume-ok → ack
connection/channel: close → close-ok

第二阶段:完整消息语义

在第一阶段基础上,增加:

  • basic.qos(prefetch 流控)
  • basic.reject / basic.recover(消息重投)
  • basic.get / get-ok / get-empty(pull 模式)
  • basic.return(mandatory 消息退回)
  • queue.purge / delete、exchange.delete

第三阶段:可靠性与事务

  • Publisher Confirm(confirm.select + basic.ack/nack)
  • Tx 事务(tx.select / commit / rollback)
  • basic.nack(批量拒绝)
  • Exchange-to-Exchange 绑定