Skip to content

NATS JetStream 协议详解

JetStream 是 NATS 内置的持久化引擎,在 Core NATS Pub/Sub 基础上增加了消息存储、重放、流量控制和精确投递语义。它没有引入任何新的线上协议指令——所有操作都通过标准的 PUB(带 reply-to)和 MSG 完成。服务端在内部暴露一组以 $JS.API. 开头的 Subject,客户端向这些 Subject 发 JSON 请求,服务端把结果发回 reply-to 地址。

为什么需要 JetStream

Core NATS 是纯实时的:消息发出时如果没有在线订阅者,消息就丢了。JetStream 通过 Stream(持久化存储)+ Consumer(消费游标) 解决以下问题:

  • 服务重启后从上次断点继续消费
  • 多个消费者各自独立重放同一批消息
  • 保证消息至少被处理一次(或恰好一次)
  • 消息量超过消费速度时做背压控制

核心概念

Stream

Stream 是消息的持久化容器,绑定一组 Subject(支持通配符)。发布到这些 Subject 的消息会被自动捕获并按保留策略存储。

存储类型(storage):

说明
file持久化到磁盘,服务重启后消息不丢失(默认)
memory存储在内存,性能更高,重启后消息消失

保留策略(retention):

说明
limits按数量、大小、时间上限保留,支持消息重放(默认)
workqueue消息确认后立即删除,每条消息只被一个消费者处理
interest只在有 Consumer 关联时保留,所有 Consumer 确认后删除

Consumer

Consumer 是 Stream 上的消费游标,记录当前消费进度,支持断线续传。同一个 Stream 可以有多个独立 Consumer,各自维护自己的进度。

Durable vs Ephemeral:

类型说明
Durable有名字,断线重连后游标保留,从上次停止处继续
Ephemeral无名字,连接断开后服务端自动清理

Push vs Pull:

类型说明适用场景
Push Consumer服务端主动推消息到 deliver_subject单消费者,有序重放
Pull Consumer客户端主动请求消息,控制消费节奏多消费者水平扩展,背压控制

投递策略(deliver_policy):

说明
all从 Stream 第一条消息开始(默认)
new只接收订阅之后新发布的消息
last从最后一条消息开始
last_per_subject每个 Subject 各取最后一条
by_start_sequenceopt_start_seq 指定序号开始
by_start_timeopt_start_time 指定时间点开始

ACK 策略(ack_policy):

说明
explicit每条消息都要显式 ACK,未 ACK 的消息超时重投(默认)
allACK 一条等于确认此前所有消息
none不需要 ACK,投递后立即视为已处理

Replay 策略(replay_policy):

说明
instant尽可能快地重放,不考虑原始发布的时间间隔(默认)
original按消息原始发布的时间间隔重放,模拟真实流量

协议交互基础

所有 JetStream 管理操作都是标准的 NATS Request-Reply 模式:

text
Client → PUB $JS.API.<operation> <reply-to> <len>
         <json request body>

Server → MSG <reply-to> <sid> <len>
         <json response body>

客户端先 SUB _INBOX.<random> 订阅一个随机地址,将其作为 reply-to 发出请求,然后等服务端把响应发到这个地址。

所有响应在出错时都会包含 error 字段:

字段类型说明
error.codenumberHTTP 状态码(如 400、404、500)
error.err_codenumberNATS 内部错误码
error.descriptionstring错误描述

常见错误码:

err_code说明
10039Stream 不存在
10014Consumer 不存在
10058Stream 名称已存在
10059Subject 已被其他 Stream 绑定
10071Consumer 名称已存在

INFO — 服务端与账户信息

$JS.API.INFO

查询服务端 JetStream 整体状态。请求 payload 为空。

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.account_info_response
memorynumber当前内存存储已用字节数
storagenumber当前磁盘存储已用字节数
reserved_memorynumber预留内存字节数
reserved_storagenumber预留磁盘字节数
streamsnumber当前 Stream 数量
consumersnumber当前 Consumer 数量
limits.max_memorynumber内存上限,-1 表示无限
limits.max_storagenumber磁盘上限,-1 表示无限
limits.max_streamsnumberStream 数量上限,-1 表示无限
limits.max_consumersnumberConsumer 数量上限,-1 表示无限
limits.max_ack_pendingnumber最大待 ACK 消息数,-1 表示无限
limits.duplicate_window_maxnumber最大去重窗口时长(纳秒)
limits.max_bytes_requiredbool是否强制要求设置 max_bytes
tiersobject?多租户 tier 配置,可选
errorobject?出错时存在

$JS.API.ACCOUNT.INFO

$JS.API.INFO 返回格式相同,查询当前账户的 JetStream 使用情况。


Stream 管理

Stream 管理交互图

$JS.API.STREAM.CREATE.<stream>

创建一个新 Stream。

请求字段(StreamConfig):

字段类型必填默认值说明
namestringStream 名称,只能包含字母、数字、-_
subjectsstring[][]绑定的 Subject 列表,支持通配符
storagestringfile存储类型:file / memory
retentionstringlimits保留策略:limits / workqueue / interest
max_msgsnumber-1最大消息数,-1 表示无限
max_bytesnumber-1最大存储字节数,-1 表示无限
max_agenumber0消息最大保留时长(纳秒),0 表示无限
max_msg_sizenumber-1单条消息最大字节数,-1 表示无限
max_msgs_per_subjectnumber-1每个 Subject 最大消息数,-1 表示无限
num_replicasnumber1副本数(集群模式)
descriptionstring""描述
discardstring""超限丢弃策略:old(丢旧)/ new(丢新)
duplicate_windownumber?去重窗口时长(纳秒)
deny_deleteboolfalse禁止删除消息
deny_purgeboolfalse禁止清空 Stream
allow_rollup_hdrsboolfalse允许 KV rollup header(KV Store 专用)

返回字段(StreamInfoResponse):

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_create_response
configobject创建后的 StreamConfig,同请求字段
state.messagesnumber当前消息总数
state.bytesnumber当前存储字节数
state.first_seqnumber第一条消息的序号
state.last_seqnumber最后一条消息的序号
state.consumer_countnumber当前 Consumer 数量
createdstring创建时间(RFC3339)
cluster.namestring?集群名称
cluster.leaderstring?当前 Leader 节点名
cluster.replicasarray?副本节点列表
errorobject?出错时存在

$JS.API.STREAM.UPDATE.<stream>

更新 Stream 配置。请求和返回格式与 STREAM.CREATE 完全相同。

$JS.API.STREAM.INFO.<stream>

查询 Stream 信息。请求 payload 为空,返回格式与 STREAM.CREATE 返回相同。

$JS.API.STREAM.DELETE.<stream>

删除 Stream 及其所有消息和 Consumer。请求 payload 为空。

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_delete_response
successbool是否删除成功
errorobject?出错时存在

$JS.API.STREAM.LIST

列出所有 Stream 的完整信息。

请求字段:

字段类型必填默认值说明
offsetnumber?0分页偏移量

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_list_response
totalnumberStream 总数
offsetnumber当前页偏移量
limitnumber每页返回数量上限
streamsarrayStreamInfo 列表,结构同 STREAM.CREATE 返回
errorobject?出错时存在

$JS.API.STREAM.NAMES

列出所有 Stream 名称(仅名字,不含完整信息)。请求字段同 STREAM.LIST

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_names_response
totalnumberStream 总数
offsetnumber当前页偏移量
limitnumber每页返回数量上限
streamsstring[]Stream 名称列表
errorobject?出错时存在

$JS.API.STREAM.PURGE.<stream>

清空 Stream 中的消息,支持按 Subject 或序号过滤。

请求字段:

字段类型必填说明
filterstring?只清空匹配此 Subject 的消息
seqnumber?只清空序号小于此值的消息
keepnumber?保留最新的 N 条消息,其余清空

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_purge_response
successbool是否执行成功
purgednumber实际删除的消息数量
errorobject?出错时存在

$JS.API.STREAM.MSG.GET.<stream>

按序号或 Subject 直接获取 Stream 中的某条消息,不更新任何消费进度。

请求字段(三选一):

字段类型说明
seqnumber?按全局序号获取
last_by_subjstring?获取该 Subject 的最后一条消息
next_by_subjstring?获取该 Subject 在当前消费位置之后的下一条消息

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_msg_get_response
message.subjectstring消息的 Subject
message.seqnumber消息在 Stream 中的全局序号
message.datastring消息 payload(base64 编码)
message.timestring发布时间(RFC3339)
message.headersobject?消息 header,键值对
errorobject?出错时存在

$JS.API.STREAM.MSG.DELETE.<stream>

删除 Stream 中的指定消息。

请求字段:

字段类型必填默认值说明
seqnumber要删除的消息序号
no_eraseboolfalsetrue 表示仅标记删除不覆盖内容,false 表示用随机数据覆盖

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_msg_delete_response
successbool是否删除成功
errorobject?出错时存在

$JS.API.STREAM.SNAPSHOT.<stream>

创建 Stream 快照,服务端将数据流式推送到 deliver_subject

请求字段:

字段类型必填默认值说明
deliver_subjectstring接收快照数据的 Subject,客户端需提前 SUB
no_consumersboolfalsetrue 表示快照中不包含 Consumer 信息
check_msgsboolfalsetrue 表示在快照前校验消息完整性

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_snapshot_response
successbool是否成功启动快照
errorobject?出错时存在

$JS.API.STREAM.RESTORE.<stream>

从快照恢复 Stream。请求 payload 为空,客户端在收到响应后将快照数据推送到服务端。返回格式同 STREAM.SNAPSHOT

$JS.API.STREAM.LEADER.STEPDOWN.<stream>

令当前 Stream 的 Raft Leader 主动让位。请求 payload 为空。

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.stream_leader_stepdown_response
successbool是否成功触发让位
errorobject?出错时存在

$JS.API.STREAM.PEER.REMOVE.<stream>

从 Stream 的副本集中移除指定节点。

请求字段:

字段类型必填说明
peerstring要移除的节点名称

返回格式同 STREAM.LEADER.STEPDOWN


Consumer 管理

Consumer 管理交互图

$JS.API.CONSUMER.CREATE.<stream> / $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer>

创建 Consumer。DURABLE.CREATE 用于创建持久化 Consumer,CREATE 用于创建临时 Consumer。

请求字段:

字段类型必填说明
stream_namestring目标 Stream 名称
actionstring?create(只创建)/ update(只更新)/ 空(创建或更新)
config.durable_namestring?Durable Consumer 名称,Ephemeral 时不设
config.namestring?Consumer 名称(2.10+ 新风格)
config.descriptionstring?描述
config.deliver_subjectstring?Push Consumer 的推送 Subject,Pull Consumer 不设
config.deliver_policystring投递策略,默认 all
config.ack_policystringACK 策略,默认 explicit
config.ack_waitnumber?ACK 超时时长(纳秒),超时重投
config.max_delivernumber?最大重投次数,-1 表示无限
config.replay_policystringReplay 策略,默认 instant
config.filter_subjectstring?只消费匹配此 Subject 的消息(单个)
config.filter_subjectsstring[]?只消费匹配这些 Subject 的消息(多个)
config.opt_start_seqnumber?deliver_policyby_start_sequence 时的起始序号
config.opt_start_timestring?deliver_policyby_start_time 时的起始时间(RFC3339)
config.max_waitingnumber?Pull Consumer 最大并发拉取请求数
config.max_ack_pendingnumber?最大待 ACK 消息数,-1 表示无限
config.flow_controlbool是否开启流量控制(Push Consumer)
config.idle_heartbeatnumber?空闲心跳间隔(纳秒,Push Consumer)
config.backoffnumber[]?重投退避时间序列(纳秒)
config.pause_untilstring?暂停到此时间点(RFC3339)后再开始投递

返回字段(ConsumerInfoResponse):

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.consumer_create_response
stream_namestring所属 Stream 名称
namestringConsumer 名称
configobjectConsumer 配置,同请求中的 config
createdstring创建时间(RFC3339)
delivered.consumer_seqnumber最后投递消息的 Consumer 序号
delivered.stream_seqnumber最后投递消息的 Stream 序号
ack_floor.consumer_seqnumber已 ACK 消息的最低 Consumer 序号
ack_floor.stream_seqnumber已 ACK 消息的最低 Stream 序号
num_ack_pendingnumber待 ACK 消息数量
num_redeliverednumber已重投消息数量
num_waitingnumber正在等待的 Pull 请求数量
num_pendingnumber还未投递的消息数量
clusterobject?集群信息
pausedbool?是否处于暂停状态
pause_remainingnumber?暂停剩余时长(纳秒)
errorobject?出错时存在

$JS.API.CONSUMER.INFO.<stream>.<consumer>

查询 Consumer 信息。请求 payload 为空,返回格式同 CONSUMER.CREATE

$JS.API.CONSUMER.DELETE.<stream>.<consumer>

删除 Consumer。请求 payload 为空。

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.consumer_delete_response
successbool是否删除成功
errorobject?出错时存在

$JS.API.CONSUMER.LIST.<stream>

列出 Stream 下所有 Consumer 的完整信息。请求字段同 STREAM.LIST(分页)。

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.consumer_list_response
totalnumberConsumer 总数
offsetnumber当前页偏移量
limitnumber每页返回数量上限
consumersarrayConsumerInfo 列表,结构同 CONSUMER.CREATE 返回
errorobject?出错时存在

$JS.API.CONSUMER.NAMES.<stream>

列出 Stream 下所有 Consumer 名称。请求字段同 STREAM.LIST(分页)。

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.consumer_names_response
totalnumberConsumer 总数
offsetnumber当前页偏移量
limitnumber每页返回数量上限
consumersstring[]Consumer 名称列表
errorobject?出错时存在

$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>

Pull Consumer 主动拉取消息。服务端将消息直接推送到请求的 reply-to 地址,每条消息的 reply-to 字段是 ACK 地址。

请求字段:

字段类型必填默认值说明
batchnumber1一次最多拉取的消息数量
expiresnumber?等待超时(纳秒),超时返回 408 状态
max_bytesnumber?一次最多返回的总字节数
no_waitboolfalsetrue 表示无消息时立即返回而不等待
idle_heartbeatnumber?等待期间心跳间隔(纳秒)

消息以标准 MSG 形式推送,无 JSON 返回体。超时或 Consumer 不存在时服务端返回状态码:

状态说明
408请求超时,无可用消息
409Consumer 已过期或被删除

$JS.API.CONSUMER.LEADER.STEPDOWN.<stream>.<consumer>

令 Consumer 的 Raft Leader 主动让位。请求 payload 为空。

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.consumer_leader_stepdown_response
successbool是否成功触发让位
errorobject?出错时存在

$JS.API.CONSUMER.PAUSE.<stream>.<consumer>

暂停或恢复 Consumer 的消息投递。

请求字段:

字段类型必填说明
pause_untilstring?暂停到此时间点(RFC3339),不填则立即恢复

返回字段:

字段类型说明
typestring固定值 io.nats.jetstream.api.v1.consumer_pause_response
pausedbool当前是否处于暂停状态
pause_untilstring?暂停截止时间,未暂停时不存在
errorobject?出错时存在

消息消费与发布

消息消费(Push Consumer)

服务端推送给消费者的每条消息,reply-to 字段就是 ACK 地址:

text
MSG mycons.delivery 1 $JS.ACK.mystream.mycons.1.5.5.1700000000.0 13\r\n
Hello JetStream\r\n

带 ACK 的发布(Publish ACK)

普通 PUB 没有持久化确认。带 reply-to 发布时,服务端在消息写入 Stream 后回复:

返回字段:

字段类型说明
streamstring消息被写入的 Stream 名称
seqnumber消息在 Stream 中的全局序号
duplicatebool是否为重复消息(去重窗口内相同 Nats-Msg-Id

去重发布时,在 Header 中携带 Nats-Msg-Id

text
HPUB orders.new _INBOX.1 44 68\r\n
NATS/1.0\r\n
Nats-Msg-Id: order-123\r\n
\r\n
{"item":"widget","qty":5}\r\n

ACK 机制

ACK 交互图

ACK 地址格式:

text
$JS.ACK.<stream>.<consumer>.<delivered_count>.<stream_seq>.<consumer_seq>.<timestamp>.<pending_msgs>

ACK 地址各字段含义:

字段说明
delivered_count该消息被投递的次数(重投时递增)
stream_seq消息在 Stream 中的全局序号
consumer_seq消息在 Consumer 中的序号
timestamp投递时间戳(纳秒)
pending_msgs当前 Consumer 待处理的消息数量

ACK 操作(向 ACK 地址 PUB 不同 payload):

Payload类型请求字段说明
空或 +ACKAck确认消息已处理
-NAKNakdelay(number?,纳秒,可选)拒绝,立即或延迟重投
-WPIProgress处理中,重置 ack_wait 计时器,防止超时重投
-NXTNextbatch(number,默认 1)确认并立即请求下一条(Pull Consumer 专用)
-TERMTerm终止,不再重投

Direct Get

直接从 Stream 读取消息,绕过 Consumer 层,不更新任何消费进度,适合随机访问场景。响应为 HMSG 格式,元数据通过 Header 返回,body 为消息原始内容。

$JS.API.DIRECT.GET.<stream>

请求字段(四选一):

字段类型说明
seqnumber?按全局序号获取
last_by_subjstring?获取该 Subject 的最后一条消息
next_by_subjstring?获取该 Subject 在指定位置之后的下一条消息
start_timestring?获取指定时间点之后的第一条消息(RFC3339)

响应 HMSG Header:

Header说明
Nats-StreamStream 名称
Nats-Sequence消息全局序号
Nats-Subject消息的 Subject
Nats-Time-Stamp发布时间(RFC3339)
Nats-Num-Pending还有多少条消息未返回(可选)

$JS.API.DIRECT.GET.<stream>.<subject>

获取指定 Subject 的最新一条消息。请求 payload 为空,响应格式同上。

$JS.API.DIRECT.GET.LAST.<stream>.<subject>

语义与 DIRECT.GET.<stream>.<subject> 相同的别名形式,响应格式同上。


Advisory 事件

服务端在 Stream 和 Consumer 状态变化时,向 $JS.EVENT.ADVISORY.* 发布事件(fire-and-forget,无 reply-to)。

所有事件的通用 payload 字段:

字段类型说明
typestring事件类型标识,如 io.nats.jetstream.advisory.v1.stream_action
idstring事件唯一 ID(UUID)
timestampstring事件发生时间(RFC3339)
streamstring相关 Stream 名称(Stream 类事件)
consumerstring?相关 Consumer 名称(Consumer 类事件)
actionstring?操作类型,如 create / delete / update
client.accstring?触发操作的账户名
client.userstring?触发操作的用户名

完整事件列表:

事件Subject触发时机
Stream 创建$JS.EVENT.ADVISORY.STREAM.CREATED.<stream>Stream 被创建时
Stream 删除$JS.EVENT.ADVISORY.STREAM.DELETED.<stream>Stream 被删除时
Stream 更新$JS.EVENT.ADVISORY.STREAM.UPDATED.<stream>Stream 配置变更时
Stream Leader 选举$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.<stream>集群 Leader 变更时
Consumer 创建$JS.EVENT.ADVISORY.CONSUMER.CREATED.<stream>.<consumer>Consumer 被创建时
Consumer 删除$JS.EVENT.ADVISORY.CONSUMER.DELETED.<stream>.<consumer>Consumer 被删除时
Consumer Leader 选举$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED.<stream>.<consumer>Consumer Leader 变更时
API 审计$JS.EVENT.ADVISORY.API每次 API 调用后
快照开始$JS.EVENT.ADVISORY.STREAM.SNAPSHOT.CREATE.<stream>快照任务开始时
快照完成$JS.EVENT.ADVISORY.STREAM.SNAPSHOT.COMPLETE.<stream>快照任务完成时
恢复开始$JS.EVENT.ADVISORY.STREAM.RESTORE.CREATE.<stream>恢复任务开始时
恢复完成$JS.EVENT.ADVISORY.STREAM.RESTORE.COMPLETE.<stream>恢复任务完成时

KV Store

KV Store 是基于 JetStream Stream 实现的键值存储,底层 Stream 名为 KV_<bucket>,每个 key 的每次写入都是一条消息,Subject 为 $KV.<bucket>.<key>,通过序号实现版本控制。

Bucket 管理

Bucket 的创建和删除本质上是对底层 Stream 的操作,直接使用 $JS.API.STREAM.CREATE.KV_<bucket>,配置中设置 subjects: ["$KV.<bucket>.>"]max_msgs_per_subject: 1(只保留最新值),allow_rollup_hdrs: true

KV 写入(Put)

$KV.<bucket>.<key> 发布消息。可带 reply-to 获取写入确认。

返回字段(KvPutResponse):

字段类型说明
streamstring底层 Stream 名称(KV_<bucket>
seqnumber写入后的消息序号(即该 key 的最新版本号)
duplicatebool是否为重复写入

KV 读取(Get)

通过 Direct Get 读取最新值:$JS.API.DIRECT.GET.LAST.KV_<bucket>.$KV.<bucket>.<key>

响应为 HMSG,Header 字段如下:

Header说明
Nats-Stream底层 Stream 名称
Nats-Subject完整 Subject($KV.<bucket>.<key>
Nats-Sequence当前版本序号
Nats-Time-Stamp写入时间
KV-Operation空 = PUT,DEL = 已删除,PURGE = 已清除
Nats-Num-Pending该 key 的历史版本数

KV 删除(Delete)

$KV.<bucket>.<key> 发布带 KV-Operation: DEL header 的空消息,标记该 key 已删除。写入确认同 Put。

KV 清除(Purge)

$KV.<bucket>.<key> 发布带 KV-Operation: PURGE header 的空消息,删除该 key 的所有历史版本。写入确认同 Put。

KV 列出所有 Key(Keys)

订阅 $KV.<bucket>.> 并通过 Consumer 消费,过滤掉 KV-Operation: DELPURGE 条目后即为当前存活的 key 列表。返回为流式推送,无统一 JSON 响应体。

KV 监听变更(Watch)

$KV.<bucket>.<key|> 创建 Push Consumer,每次 key 发生变化时收到 KV get 格式的推送消息。


Object Store

Object Store 是基于 JetStream Stream 实现的对象存储,底层 Stream 名为 OBJ_<bucket>,支持任意大小的二进制对象通过分块上传存储。每个对象由两部分组成:

  • 元数据:存储在 $OBJ.<bucket>.info.<object> Subject
  • 数据分块:存储在 $OBJ.<bucket>.chunks.<nonce> Subject

Object Bucket 管理

直接使用 $JS.API.STREAM.CREATE.OBJ_<bucket>,配置中设置 subjects: ["$OBJ.<bucket>.>"]

上传对象(Put)

分两步:先发布元数据,再分块发布数据。

元数据字段(ObjectMeta,发布到 $OBJ.<bucket>.info.<object>):

字段类型必填说明
namestring对象名称
descriptionstring描述
noncestring唯一随机标识,数据分块发布到 $OBJ.<bucket>.chunks.<nonce>
bucketstringBucket 名称
chunksnumber总块数
sizenumber对象总字节数
headersobject?自定义 header
optionsobject?扩展选项

数据分块依次发布到 $OBJ.<bucket>.chunks.<nonce>,最后一块发布后服务端自动完成组装。

上传完成后的返回字段(ObjectInfo):

字段类型说明
namestring对象名称
descriptionstring描述
noncestring唯一标识
bucketstringBucket 名称
chunksnumber总块数
sizenumber总字节数
digeststring内容摘要(sha-256=...
deletedbool是否已删除
headersobject?自定义 header

下载对象(Get)

$OBJ.<bucket> 发送请求,服务端流式推送分块到 deliver_subject

请求字段:

字段类型必填说明
namestring对象名称
deliver_subjectstring接收数据块的 Subject,客户端需提前 SUB

无 JSON 响应体,数据以标准 MSG 分块推送到 deliver_subject

获取对象元数据(Info)

$OBJ.<bucket> 发送请求,返回 ObjectInfo,字段同上传完成后的返回。

请求字段:

字段类型必填说明
namestring对象名称

删除对象(Delete)

$OBJ.<bucket> 发送请求。

请求字段:

字段类型必填说明
namestring要删除的对象名称

返回字段:

字段类型说明
successbool是否删除成功

列出所有对象(List)

$OBJ.<bucket> 发送空 payload 请求。

返回字段(ObjectListResponse):

字段类型说明
bucketstringBucket 名称
objectsarrayObjectInfo 列表,结构同上传完成后的返回

监听变更(Watch)

订阅 $OBJ.<bucket>.info.>,每次对象上传或删除时收到 ObjectInfo 推送(deleted: true 表示已删除)。


JetStream Domain

服务端配置了 JetStream Domain 后,API Subject 前缀从 $JS.API 变为 $JS.<domain>.API

text
# 默认
PUB $JS.API.STREAM.INFO.mystream _INBOX.1 0

# 指定 domain
PUB $JS.hub.API.STREAM.INFO.mystream _INBOX.1 0

与 Core NATS 的关系

JetStream 不引入任何新的线上协议命令,协议解析器无需任何修改:

  • 普通 PUB 发出的消息,如果匹配了某个 Stream 的 Subject,会被自动捕获持久化,发布者不感知 JetStream 的存在
  • 想要发布确认(Publish ACK),只需在 PUB 时带上 reply-to
  • Consumer 收到的消息是标准的 MSGHMSG,reply-to 字段是 ACK 地址
  • 所有管理操作都是向 $JS.API.* Subject 发 JSON 请求

实现 JetStream 兼容的工作全部在业务层:识别特定 Subject 前缀、解析 JSON 请求体、执行对应操作、序列化结果通过 MSG 回复。


参考资料

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀