NATS JetStream 协议详解
JetStream 是 NATS 内置的持久化引擎,在 Core NATS Pub/Sub 基础上增加了消息存储、重放、流量控制和精确投递语义。它没有引入任何新的线上协议指令——所有操作都通过标准的 PUB(带 reply-to)和 MSG 完成。服务端在内部暴露一组以 $JS.API. 开头的 Subject,客户端向这些 Subject 发 JSON 请求,服务端把结果发回 reply-to 地址。
为什么需要 JetStream
Core NATS 是纯实时的:消息发出时如果没有在线订阅者,消息就丢了。JetStream 通过 Stream(持久化存储)+ Consumer(消费游标) 解决以下问题:
- 服务重启后从上次断点继续消费
- 多个消费者各自独立重放同一批消息
- 保证消息至少被处理一次(或恰好一次)
- 消息量超过消费速度时做背压控制
核心概念
Stream
Stream 是消息的持久化容器,绑定一组 Subject(支持通配符)。发布到这些 Subject 的消息会被自动捕获并按保留策略存储。
存储类型(storage):
| 值 | 说明 |
|---|---|
file | 持久化到磁盘,服务重启后消息不丢失(默认) |
memory | 存储在内存,性能更高,重启后消息消失 |
保留策略(retention):
| 值 | 说明 |
|---|---|
limits | 按数量、大小、时间上限保留,支持消息重放(默认) |
workqueue | 消息确认后立即删除,每条消息只被一个消费者处理 |
interest | 只在有 Consumer 关联时保留,所有 Consumer 确认后删除 |
Consumer
Consumer 是 Stream 上的消费游标,记录当前消费进度,支持断线续传。同一个 Stream 可以有多个独立 Consumer,各自维护自己的进度。
Durable vs Ephemeral:
| 类型 | 说明 |
|---|---|
| Durable | 有名字,断线重连后游标保留,从上次停止处继续 |
| Ephemeral | 无名字,连接断开后服务端自动清理 |
Push vs Pull:
| 类型 | 说明 | 适用场景 |
|---|---|---|
| Push Consumer | 服务端主动推消息到 deliver_subject | 单消费者,有序重放 |
| Pull Consumer | 客户端主动请求消息,控制消费节奏 | 多消费者水平扩展,背压控制 |
投递策略(deliver_policy):
| 值 | 说明 |
|---|---|
all | 从 Stream 第一条消息开始(默认) |
new | 只接收订阅之后新发布的消息 |
last | 从最后一条消息开始 |
last_per_subject | 每个 Subject 各取最后一条 |
by_start_sequence | 从 opt_start_seq 指定序号开始 |
by_start_time | 从 opt_start_time 指定时间点开始 |
ACK 策略(ack_policy):
| 值 | 说明 |
|---|---|
explicit | 每条消息都要显式 ACK,未 ACK 的消息超时重投(默认) |
all | ACK 一条等于确认此前所有消息 |
none | 不需要 ACK,投递后立即视为已处理 |
Replay 策略(replay_policy):
| 值 | 说明 |
|---|---|
instant | 尽可能快地重放,不考虑原始发布的时间间隔(默认) |
original | 按消息原始发布的时间间隔重放,模拟真实流量 |
协议交互基础
所有 JetStream 管理操作都是标准的 NATS Request-Reply 模式:
Client → PUB $JS.API.<operation> <reply-to> <len>
<json request body>
Server → MSG <reply-to> <sid> <len>
<json response body>客户端先 SUB _INBOX.<random> 订阅一个随机地址,将其作为 reply-to 发出请求,然后等服务端把响应发到这个地址。
所有响应在出错时都会包含 error 字段:
| 字段 | 类型 | 说明 |
|---|---|---|
error.code | number | HTTP 状态码(如 400、404、500) |
error.err_code | number | NATS 内部错误码 |
error.description | string | 错误描述 |
常见错误码:
err_code | 说明 |
|---|---|
| 10039 | Stream 不存在 |
| 10014 | Consumer 不存在 |
| 10058 | Stream 名称已存在 |
| 10059 | Subject 已被其他 Stream 绑定 |
| 10071 | Consumer 名称已存在 |
INFO — 服务端与账户信息
$JS.API.INFO
查询服务端 JetStream 整体状态。请求 payload 为空。
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.account_info_response |
memory | number | 当前内存存储已用字节数 |
storage | number | 当前磁盘存储已用字节数 |
reserved_memory | number | 预留内存字节数 |
reserved_storage | number | 预留磁盘字节数 |
streams | number | 当前 Stream 数量 |
consumers | number | 当前 Consumer 数量 |
limits.max_memory | number | 内存上限,-1 表示无限 |
limits.max_storage | number | 磁盘上限,-1 表示无限 |
limits.max_streams | number | Stream 数量上限,-1 表示无限 |
limits.max_consumers | number | Consumer 数量上限,-1 表示无限 |
limits.max_ack_pending | number | 最大待 ACK 消息数,-1 表示无限 |
limits.duplicate_window_max | number | 最大去重窗口时长(纳秒) |
limits.max_bytes_required | bool | 是否强制要求设置 max_bytes |
tiers | object? | 多租户 tier 配置,可选 |
error | object? | 出错时存在 |
$JS.API.ACCOUNT.INFO
与 $JS.API.INFO 返回格式相同,查询当前账户的 JetStream 使用情况。
Stream 管理

$JS.API.STREAM.CREATE.<stream>
创建一个新 Stream。
请求字段(StreamConfig):
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
name | string | ✓ | — | Stream 名称,只能包含字母、数字、-、_ |
subjects | string[] | — | [] | 绑定的 Subject 列表,支持通配符 |
storage | string | — | file | 存储类型:file / memory |
retention | string | — | limits | 保留策略:limits / workqueue / interest |
max_msgs | number | — | -1 | 最大消息数,-1 表示无限 |
max_bytes | number | — | -1 | 最大存储字节数,-1 表示无限 |
max_age | number | — | 0 | 消息最大保留时长(纳秒),0 表示无限 |
max_msg_size | number | — | -1 | 单条消息最大字节数,-1 表示无限 |
max_msgs_per_subject | number | — | -1 | 每个 Subject 最大消息数,-1 表示无限 |
num_replicas | number | — | 1 | 副本数(集群模式) |
description | string | — | "" | 描述 |
discard | string | — | "" | 超限丢弃策略:old(丢旧)/ new(丢新) |
duplicate_window | number? | — | — | 去重窗口时长(纳秒) |
deny_delete | bool | — | false | 禁止删除消息 |
deny_purge | bool | — | false | 禁止清空 Stream |
allow_rollup_hdrs | bool | — | false | 允许 KV rollup header(KV Store 专用) |
返回字段(StreamInfoResponse):
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_create_response |
config | object | 创建后的 StreamConfig,同请求字段 |
state.messages | number | 当前消息总数 |
state.bytes | number | 当前存储字节数 |
state.first_seq | number | 第一条消息的序号 |
state.last_seq | number | 最后一条消息的序号 |
state.consumer_count | number | 当前 Consumer 数量 |
created | string | 创建时间(RFC3339) |
cluster.name | string? | 集群名称 |
cluster.leader | string? | 当前 Leader 节点名 |
cluster.replicas | array? | 副本节点列表 |
error | object? | 出错时存在 |
$JS.API.STREAM.UPDATE.<stream>
更新 Stream 配置。请求和返回格式与 STREAM.CREATE 完全相同。
$JS.API.STREAM.INFO.<stream>
查询 Stream 信息。请求 payload 为空,返回格式与 STREAM.CREATE 返回相同。
$JS.API.STREAM.DELETE.<stream>
删除 Stream 及其所有消息和 Consumer。请求 payload 为空。
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_delete_response |
success | bool | 是否删除成功 |
error | object? | 出错时存在 |
$JS.API.STREAM.LIST
列出所有 Stream 的完整信息。
请求字段:
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
offset | number? | — | 0 | 分页偏移量 |
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_list_response |
total | number | Stream 总数 |
offset | number | 当前页偏移量 |
limit | number | 每页返回数量上限 |
streams | array | StreamInfo 列表,结构同 STREAM.CREATE 返回 |
error | object? | 出错时存在 |
$JS.API.STREAM.NAMES
列出所有 Stream 名称(仅名字,不含完整信息)。请求字段同 STREAM.LIST。
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_names_response |
total | number | Stream 总数 |
offset | number | 当前页偏移量 |
limit | number | 每页返回数量上限 |
streams | string[] | Stream 名称列表 |
error | object? | 出错时存在 |
$JS.API.STREAM.PURGE.<stream>
清空 Stream 中的消息,支持按 Subject 或序号过滤。
请求字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
filter | string? | — | 只清空匹配此 Subject 的消息 |
seq | number? | — | 只清空序号小于此值的消息 |
keep | number? | — | 保留最新的 N 条消息,其余清空 |
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_purge_response |
success | bool | 是否执行成功 |
purged | number | 实际删除的消息数量 |
error | object? | 出错时存在 |
$JS.API.STREAM.MSG.GET.<stream>
按序号或 Subject 直接获取 Stream 中的某条消息,不更新任何消费进度。
请求字段(三选一):
| 字段 | 类型 | 说明 |
|---|---|---|
seq | number? | 按全局序号获取 |
last_by_subj | string? | 获取该 Subject 的最后一条消息 |
next_by_subj | string? | 获取该 Subject 在当前消费位置之后的下一条消息 |
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_msg_get_response |
message.subject | string | 消息的 Subject |
message.seq | number | 消息在 Stream 中的全局序号 |
message.data | string | 消息 payload(base64 编码) |
message.time | string | 发布时间(RFC3339) |
message.headers | object? | 消息 header,键值对 |
error | object? | 出错时存在 |
$JS.API.STREAM.MSG.DELETE.<stream>
删除 Stream 中的指定消息。
请求字段:
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
seq | number | ✓ | — | 要删除的消息序号 |
no_erase | bool | — | false | true 表示仅标记删除不覆盖内容,false 表示用随机数据覆盖 |
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_msg_delete_response |
success | bool | 是否删除成功 |
error | object? | 出错时存在 |
$JS.API.STREAM.SNAPSHOT.<stream>
创建 Stream 快照,服务端将数据流式推送到 deliver_subject。
请求字段:
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
deliver_subject | string | ✓ | — | 接收快照数据的 Subject,客户端需提前 SUB |
no_consumers | bool | — | false | true 表示快照中不包含 Consumer 信息 |
check_msgs | bool | — | false | true 表示在快照前校验消息完整性 |
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_snapshot_response |
success | bool | 是否成功启动快照 |
error | object? | 出错时存在 |
$JS.API.STREAM.RESTORE.<stream>
从快照恢复 Stream。请求 payload 为空,客户端在收到响应后将快照数据推送到服务端。返回格式同 STREAM.SNAPSHOT。
$JS.API.STREAM.LEADER.STEPDOWN.<stream>
令当前 Stream 的 Raft Leader 主动让位。请求 payload 为空。
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.stream_leader_stepdown_response |
success | bool | 是否成功触发让位 |
error | object? | 出错时存在 |
$JS.API.STREAM.PEER.REMOVE.<stream>
从 Stream 的副本集中移除指定节点。
请求字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
peer | string | ✓ | 要移除的节点名称 |
返回格式同 STREAM.LEADER.STEPDOWN。
Consumer 管理

$JS.API.CONSUMER.CREATE.<stream> / $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer>
创建 Consumer。DURABLE.CREATE 用于创建持久化 Consumer,CREATE 用于创建临时 Consumer。
请求字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
stream_name | string | ✓ | 目标 Stream 名称 |
action | string? | — | create(只创建)/ update(只更新)/ 空(创建或更新) |
config.durable_name | string? | — | Durable Consumer 名称,Ephemeral 时不设 |
config.name | string? | — | Consumer 名称(2.10+ 新风格) |
config.description | string? | — | 描述 |
config.deliver_subject | string? | — | Push Consumer 的推送 Subject,Pull Consumer 不设 |
config.deliver_policy | string | — | 投递策略,默认 all |
config.ack_policy | string | — | ACK 策略,默认 explicit |
config.ack_wait | number? | — | ACK 超时时长(纳秒),超时重投 |
config.max_deliver | number? | — | 最大重投次数,-1 表示无限 |
config.replay_policy | string | — | Replay 策略,默认 instant |
config.filter_subject | string? | — | 只消费匹配此 Subject 的消息(单个) |
config.filter_subjects | string[]? | — | 只消费匹配这些 Subject 的消息(多个) |
config.opt_start_seq | number? | — | deliver_policy 为 by_start_sequence 时的起始序号 |
config.opt_start_time | string? | — | deliver_policy 为 by_start_time 时的起始时间(RFC3339) |
config.max_waiting | number? | — | Pull Consumer 最大并发拉取请求数 |
config.max_ack_pending | number? | — | 最大待 ACK 消息数,-1 表示无限 |
config.flow_control | bool | — | 是否开启流量控制(Push Consumer) |
config.idle_heartbeat | number? | — | 空闲心跳间隔(纳秒,Push Consumer) |
config.backoff | number[]? | — | 重投退避时间序列(纳秒) |
config.pause_until | string? | — | 暂停到此时间点(RFC3339)后再开始投递 |
返回字段(ConsumerInfoResponse):
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.consumer_create_response |
stream_name | string | 所属 Stream 名称 |
name | string | Consumer 名称 |
config | object | Consumer 配置,同请求中的 config |
created | string | 创建时间(RFC3339) |
delivered.consumer_seq | number | 最后投递消息的 Consumer 序号 |
delivered.stream_seq | number | 最后投递消息的 Stream 序号 |
ack_floor.consumer_seq | number | 已 ACK 消息的最低 Consumer 序号 |
ack_floor.stream_seq | number | 已 ACK 消息的最低 Stream 序号 |
num_ack_pending | number | 待 ACK 消息数量 |
num_redelivered | number | 已重投消息数量 |
num_waiting | number | 正在等待的 Pull 请求数量 |
num_pending | number | 还未投递的消息数量 |
cluster | object? | 集群信息 |
paused | bool? | 是否处于暂停状态 |
pause_remaining | number? | 暂停剩余时长(纳秒) |
error | object? | 出错时存在 |
$JS.API.CONSUMER.INFO.<stream>.<consumer>
查询 Consumer 信息。请求 payload 为空,返回格式同 CONSUMER.CREATE。
$JS.API.CONSUMER.DELETE.<stream>.<consumer>
删除 Consumer。请求 payload 为空。
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.consumer_delete_response |
success | bool | 是否删除成功 |
error | object? | 出错时存在 |
$JS.API.CONSUMER.LIST.<stream>
列出 Stream 下所有 Consumer 的完整信息。请求字段同 STREAM.LIST(分页)。
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.consumer_list_response |
total | number | Consumer 总数 |
offset | number | 当前页偏移量 |
limit | number | 每页返回数量上限 |
consumers | array | ConsumerInfo 列表,结构同 CONSUMER.CREATE 返回 |
error | object? | 出错时存在 |
$JS.API.CONSUMER.NAMES.<stream>
列出 Stream 下所有 Consumer 名称。请求字段同 STREAM.LIST(分页)。
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.consumer_names_response |
total | number | Consumer 总数 |
offset | number | 当前页偏移量 |
limit | number | 每页返回数量上限 |
consumers | string[] | Consumer 名称列表 |
error | object? | 出错时存在 |
$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>
Pull Consumer 主动拉取消息。服务端将消息直接推送到请求的 reply-to 地址,每条消息的 reply-to 字段是 ACK 地址。
请求字段:
| 字段 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
batch | number | — | 1 | 一次最多拉取的消息数量 |
expires | number? | — | — | 等待超时(纳秒),超时返回 408 状态 |
max_bytes | number? | — | — | 一次最多返回的总字节数 |
no_wait | bool | — | false | true 表示无消息时立即返回而不等待 |
idle_heartbeat | number? | — | — | 等待期间心跳间隔(纳秒) |
消息以标准 MSG 形式推送,无 JSON 返回体。超时或 Consumer 不存在时服务端返回状态码:
| 状态 | 说明 |
|---|---|
408 | 请求超时,无可用消息 |
409 | Consumer 已过期或被删除 |
$JS.API.CONSUMER.LEADER.STEPDOWN.<stream>.<consumer>
令 Consumer 的 Raft Leader 主动让位。请求 payload 为空。
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.consumer_leader_stepdown_response |
success | bool | 是否成功触发让位 |
error | object? | 出错时存在 |
$JS.API.CONSUMER.PAUSE.<stream>.<consumer>
暂停或恢复 Consumer 的消息投递。
请求字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
pause_until | string? | — | 暂停到此时间点(RFC3339),不填则立即恢复 |
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 固定值 io.nats.jetstream.api.v1.consumer_pause_response |
paused | bool | 当前是否处于暂停状态 |
pause_until | string? | 暂停截止时间,未暂停时不存在 |
error | object? | 出错时存在 |
消息消费与发布
消息消费(Push Consumer)
服务端推送给消费者的每条消息,reply-to 字段就是 ACK 地址:
MSG mycons.delivery 1 $JS.ACK.mystream.mycons.1.5.5.1700000000.0 13\r\n
Hello JetStream\r\n带 ACK 的发布(Publish ACK)
普通 PUB 没有持久化确认。带 reply-to 发布时,服务端在消息写入 Stream 后回复:
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
stream | string | 消息被写入的 Stream 名称 |
seq | number | 消息在 Stream 中的全局序号 |
duplicate | bool | 是否为重复消息(去重窗口内相同 Nats-Msg-Id) |
去重发布时,在 Header 中携带 Nats-Msg-Id:
HPUB orders.new _INBOX.1 44 68\r\n
NATS/1.0\r\n
Nats-Msg-Id: order-123\r\n
\r\n
{"item":"widget","qty":5}\r\nACK 机制

ACK 地址格式:
$JS.ACK.<stream>.<consumer>.<delivered_count>.<stream_seq>.<consumer_seq>.<timestamp>.<pending_msgs>ACK 地址各字段含义:
| 字段 | 说明 |
|---|---|
delivered_count | 该消息被投递的次数(重投时递增) |
stream_seq | 消息在 Stream 中的全局序号 |
consumer_seq | 消息在 Consumer 中的序号 |
timestamp | 投递时间戳(纳秒) |
pending_msgs | 当前 Consumer 待处理的消息数量 |
ACK 操作(向 ACK 地址 PUB 不同 payload):
| Payload | 类型 | 请求字段 | 说明 |
|---|---|---|---|
空或 +ACK | Ack | 无 | 确认消息已处理 |
-NAK | Nak | delay(number?,纳秒,可选) | 拒绝,立即或延迟重投 |
-WPI | Progress | 无 | 处理中,重置 ack_wait 计时器,防止超时重投 |
-NXT | Next | batch(number,默认 1) | 确认并立即请求下一条(Pull Consumer 专用) |
-TERM | Term | 无 | 终止,不再重投 |
Direct Get
直接从 Stream 读取消息,绕过 Consumer 层,不更新任何消费进度,适合随机访问场景。响应为 HMSG 格式,元数据通过 Header 返回,body 为消息原始内容。
$JS.API.DIRECT.GET.<stream>
请求字段(四选一):
| 字段 | 类型 | 说明 |
|---|---|---|
seq | number? | 按全局序号获取 |
last_by_subj | string? | 获取该 Subject 的最后一条消息 |
next_by_subj | string? | 获取该 Subject 在指定位置之后的下一条消息 |
start_time | string? | 获取指定时间点之后的第一条消息(RFC3339) |
响应 HMSG Header:
| Header | 说明 |
|---|---|
Nats-Stream | Stream 名称 |
Nats-Sequence | 消息全局序号 |
Nats-Subject | 消息的 Subject |
Nats-Time-Stamp | 发布时间(RFC3339) |
Nats-Num-Pending | 还有多少条消息未返回(可选) |
$JS.API.DIRECT.GET.<stream>.<subject>
获取指定 Subject 的最新一条消息。请求 payload 为空,响应格式同上。
$JS.API.DIRECT.GET.LAST.<stream>.<subject>
语义与 DIRECT.GET.<stream>.<subject> 相同的别名形式,响应格式同上。
Advisory 事件
服务端在 Stream 和 Consumer 状态变化时,向 $JS.EVENT.ADVISORY.* 发布事件(fire-and-forget,无 reply-to)。
所有事件的通用 payload 字段:
| 字段 | 类型 | 说明 |
|---|---|---|
type | string | 事件类型标识,如 io.nats.jetstream.advisory.v1.stream_action |
id | string | 事件唯一 ID(UUID) |
timestamp | string | 事件发生时间(RFC3339) |
stream | string | 相关 Stream 名称(Stream 类事件) |
consumer | string? | 相关 Consumer 名称(Consumer 类事件) |
action | string? | 操作类型,如 create / delete / update |
client.acc | string? | 触发操作的账户名 |
client.user | string? | 触发操作的用户名 |
完整事件列表:
| 事件 | Subject | 触发时机 |
|---|---|---|
| Stream 创建 | $JS.EVENT.ADVISORY.STREAM.CREATED.<stream> | Stream 被创建时 |
| Stream 删除 | $JS.EVENT.ADVISORY.STREAM.DELETED.<stream> | Stream 被删除时 |
| Stream 更新 | $JS.EVENT.ADVISORY.STREAM.UPDATED.<stream> | Stream 配置变更时 |
| Stream Leader 选举 | $JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.<stream> | 集群 Leader 变更时 |
| Consumer 创建 | $JS.EVENT.ADVISORY.CONSUMER.CREATED.<stream>.<consumer> | Consumer 被创建时 |
| Consumer 删除 | $JS.EVENT.ADVISORY.CONSUMER.DELETED.<stream>.<consumer> | Consumer 被删除时 |
| Consumer Leader 选举 | $JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED.<stream>.<consumer> | Consumer Leader 变更时 |
| API 审计 | $JS.EVENT.ADVISORY.API | 每次 API 调用后 |
| 快照开始 | $JS.EVENT.ADVISORY.STREAM.SNAPSHOT.CREATE.<stream> | 快照任务开始时 |
| 快照完成 | $JS.EVENT.ADVISORY.STREAM.SNAPSHOT.COMPLETE.<stream> | 快照任务完成时 |
| 恢复开始 | $JS.EVENT.ADVISORY.STREAM.RESTORE.CREATE.<stream> | 恢复任务开始时 |
| 恢复完成 | $JS.EVENT.ADVISORY.STREAM.RESTORE.COMPLETE.<stream> | 恢复任务完成时 |
KV Store
KV Store 是基于 JetStream Stream 实现的键值存储,底层 Stream 名为 KV_<bucket>,每个 key 的每次写入都是一条消息,Subject 为 $KV.<bucket>.<key>,通过序号实现版本控制。
Bucket 管理
Bucket 的创建和删除本质上是对底层 Stream 的操作,直接使用 $JS.API.STREAM.CREATE.KV_<bucket>,配置中设置 subjects: ["$KV.<bucket>.>"],max_msgs_per_subject: 1(只保留最新值),allow_rollup_hdrs: true。
KV 写入(Put)
向 $KV.<bucket>.<key> 发布消息。可带 reply-to 获取写入确认。
返回字段(KvPutResponse):
| 字段 | 类型 | 说明 |
|---|---|---|
stream | string | 底层 Stream 名称(KV_<bucket>) |
seq | number | 写入后的消息序号(即该 key 的最新版本号) |
duplicate | bool | 是否为重复写入 |
KV 读取(Get)
通过 Direct Get 读取最新值:$JS.API.DIRECT.GET.LAST.KV_<bucket>.$KV.<bucket>.<key>。
响应为 HMSG,Header 字段如下:
| Header | 说明 |
|---|---|
Nats-Stream | 底层 Stream 名称 |
Nats-Subject | 完整 Subject($KV.<bucket>.<key>) |
Nats-Sequence | 当前版本序号 |
Nats-Time-Stamp | 写入时间 |
KV-Operation | 空 = PUT,DEL = 已删除,PURGE = 已清除 |
Nats-Num-Pending | 该 key 的历史版本数 |
KV 删除(Delete)
向 $KV.<bucket>.<key> 发布带 KV-Operation: DEL header 的空消息,标记该 key 已删除。写入确认同 Put。
KV 清除(Purge)
向 $KV.<bucket>.<key> 发布带 KV-Operation: PURGE header 的空消息,删除该 key 的所有历史版本。写入确认同 Put。
KV 列出所有 Key(Keys)
订阅 $KV.<bucket>.> 并通过 Consumer 消费,过滤掉 KV-Operation: DEL 和 PURGE 条目后即为当前存活的 key 列表。返回为流式推送,无统一 JSON 响应体。
KV 监听变更(Watch)
对 $KV.<bucket>.<key|> 创建 Push Consumer,每次 key 发生变化时收到 KV get 格式的推送消息。
Object Store
Object Store 是基于 JetStream Stream 实现的对象存储,底层 Stream 名为 OBJ_<bucket>,支持任意大小的二进制对象通过分块上传存储。每个对象由两部分组成:
- 元数据:存储在
$OBJ.<bucket>.info.<object>Subject - 数据分块:存储在
$OBJ.<bucket>.chunks.<nonce>Subject
Object Bucket 管理
直接使用 $JS.API.STREAM.CREATE.OBJ_<bucket>,配置中设置 subjects: ["$OBJ.<bucket>.>"]。
上传对象(Put)
分两步:先发布元数据,再分块发布数据。
元数据字段(ObjectMeta,发布到 $OBJ.<bucket>.info.<object>):
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
name | string | ✓ | 对象名称 |
description | string | — | 描述 |
nonce | string | ✓ | 唯一随机标识,数据分块发布到 $OBJ.<bucket>.chunks.<nonce> |
bucket | string | ✓ | Bucket 名称 |
chunks | number | ✓ | 总块数 |
size | number | ✓ | 对象总字节数 |
headers | object? | — | 自定义 header |
options | object? | — | 扩展选项 |
数据分块依次发布到 $OBJ.<bucket>.chunks.<nonce>,最后一块发布后服务端自动完成组装。
上传完成后的返回字段(ObjectInfo):
| 字段 | 类型 | 说明 |
|---|---|---|
name | string | 对象名称 |
description | string | 描述 |
nonce | string | 唯一标识 |
bucket | string | Bucket 名称 |
chunks | number | 总块数 |
size | number | 总字节数 |
digest | string | 内容摘要(sha-256=...) |
deleted | bool | 是否已删除 |
headers | object? | 自定义 header |
下载对象(Get)
向 $OBJ.<bucket> 发送请求,服务端流式推送分块到 deliver_subject。
请求字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
name | string | ✓ | 对象名称 |
deliver_subject | string | ✓ | 接收数据块的 Subject,客户端需提前 SUB |
无 JSON 响应体,数据以标准 MSG 分块推送到 deliver_subject。
获取对象元数据(Info)
向 $OBJ.<bucket> 发送请求,返回 ObjectInfo,字段同上传完成后的返回。
请求字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
name | string | ✓ | 对象名称 |
删除对象(Delete)
向 $OBJ.<bucket> 发送请求。
请求字段:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
name | string | ✓ | 要删除的对象名称 |
返回字段:
| 字段 | 类型 | 说明 |
|---|---|---|
success | bool | 是否删除成功 |
列出所有对象(List)
向 $OBJ.<bucket> 发送空 payload 请求。
返回字段(ObjectListResponse):
| 字段 | 类型 | 说明 |
|---|---|---|
bucket | string | Bucket 名称 |
objects | array | ObjectInfo 列表,结构同上传完成后的返回 |
监听变更(Watch)
订阅 $OBJ.<bucket>.info.>,每次对象上传或删除时收到 ObjectInfo 推送(deleted: true 表示已删除)。
JetStream Domain
服务端配置了 JetStream Domain 后,API Subject 前缀从 $JS.API 变为 $JS.<domain>.API:
# 默认
PUB $JS.API.STREAM.INFO.mystream _INBOX.1 0
# 指定 domain
PUB $JS.hub.API.STREAM.INFO.mystream _INBOX.1 0与 Core NATS 的关系
JetStream 不引入任何新的线上协议命令,协议解析器无需任何修改:
- 普通
PUB发出的消息,如果匹配了某个 Stream 的 Subject,会被自动捕获持久化,发布者不感知 JetStream 的存在 - 想要发布确认(Publish ACK),只需在
PUB时带上 reply-to - Consumer 收到的消息是标准的
MSG或HMSG,reply-to 字段是 ACK 地址 - 所有管理操作都是向
$JS.API.*Subject 发 JSON 请求
实现 JetStream 兼容的工作全部在业务层:识别特定 Subject 前缀、解析 JSON 请求体、执行对应操作、序列化结果通过 MSG 回复。
