Skip to content

NATS JetStream Protocol Reference

JetStream is NATS's built-in persistence engine. It adds message storage, replay, flow control, and precise delivery semantics on top of Core NATS Pub/Sub. It introduces no new wire protocol commands — all operations are completed via standard PUB (with reply-to) and MSG. The server internally exposes a set of Subjects starting with $JS.API., clients send JSON requests to these Subjects, and the server sends results back to the reply-to address.

Why JetStream

Core NATS is purely real-time: if there are no online subscribers when a message is sent, the message is lost. JetStream solves the following problems through Stream (persistent storage) + Consumer (consumption cursor):

  • Resume consumption from the last checkpoint after a service restart
  • Multiple consumers independently replay the same batch of messages
  • Guarantee messages are processed at least once (or exactly once)
  • Apply backpressure when message volume exceeds consumption speed

Core Concepts

Stream

A Stream is a persistent container for messages, bound to a set of Subjects (wildcards supported). Messages published to these Subjects are automatically captured and stored according to the retention policy.

Storage types (storage):

ValueDescription
filePersisted to disk; messages survive server restarts (default)
memoryStored in memory; higher performance but messages are lost on restart

Retention policies (retention):

ValueDescription
limitsRetain based on count, size, or age limits; supports message replay (default)
workqueueMessages are deleted immediately after acknowledgement; each message is processed by only one consumer
interestMessages are only retained while a Consumer is associated; deleted after all Consumers acknowledge

Consumer

A Consumer is a consumption cursor on a Stream that records how far consumption has progressed and supports resumption after disconnection. The same Stream can have multiple independent Consumers, each maintaining its own progress.

Durable vs Ephemeral:

TypeDescription
DurableHas a name; cursor is retained after disconnection, resumes from where it left off
EphemeralNo name; server automatically cleans it up when the connection closes

Push vs Pull:

TypeDescriptionUse Case
Push ConsumerServer proactively pushes messages to a specified SubjectSingle consumer, ordered replay
Pull ConsumerClient actively requests messages, controlling consumption paceMulti-consumer horizontal scaling, backpressure control

Deliver policies (deliver_policy):

ValueDescription
allStart from the first message in the Stream (default)
newOnly receive messages published after subscribing
lastStart from the last message
last_per_subjectTake the last message per Subject
by_start_sequenceStart from the sequence number specified by opt_start_seq
by_start_timeStart from the timestamp specified by opt_start_time

ACK policies (ack_policy):

ValueDescription
explicitEach message must be explicitly ACKed; unACKed messages are redelivered after timeout (default)
allACKing one message confirms all previous messages as well
noneNo ACK required; messages are immediately considered processed after delivery

Replay policies (replay_policy):

ValueDescription
instantReplay messages as fast as possible, ignoring original publish timing (default)
originalReplay at the same intervals as originally published, simulating real traffic

Protocol Basics

All JetStream management operations use the standard NATS Request-Reply pattern:

text
Client → PUB $JS.API.<operation> <reply-to> <len>
         <json request body>

Server → MSG <reply-to> <sid> <len>
         <json response body>

The client first SUB _INBOX.<random> to subscribe to a random address, uses it as the reply-to in the request, then waits for the server to send the response to that address.

All responses include an error field on failure:

FieldTypeDescription
error.codenumberHTTP status code (e.g. 400, 404, 500)
error.err_codenumberNATS internal error code
error.descriptionstringError description

Common error codes:

err_codeDescription
10039Stream does not exist
10014Consumer does not exist
10058Stream name already in use
10059Subject is already bound to another Stream
10071Consumer name already exists

INFO — Server and Account Info

$JS.API.INFO

Query the overall JetStream status of the server. Request payload is empty.

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.account_info_response
memorynumberCurrent memory storage used (bytes)
storagenumberCurrent disk storage used (bytes)
reserved_memorynumberReserved memory (bytes)
reserved_storagenumberReserved disk space (bytes)
streamsnumberCurrent number of Streams
consumersnumberCurrent number of Consumers
limits.max_memorynumberMemory limit; -1 = unlimited
limits.max_storagenumberDisk limit; -1 = unlimited
limits.max_streamsnumberStream count limit; -1 = unlimited
limits.max_consumersnumberConsumer count limit; -1 = unlimited
limits.max_ack_pendingnumberMax pending ACK messages; -1 = unlimited
limits.duplicate_window_maxnumberMax deduplication window duration (nanoseconds)
limits.max_bytes_requiredboolWhether setting max_bytes is required
tiersobject?Multi-tenant tier configuration (optional)
errorobject?Present on error

$JS.API.ACCOUNT.INFO

Returns the same format as $JS.API.INFO, querying the JetStream usage of the current account.


Stream Management

Stream management interaction diagram

$JS.API.STREAM.CREATE.<stream>

Create a new Stream.

Request fields (StreamConfig):

FieldTypeRequiredDefaultDescription
namestringStream name; only letters, digits, -, _ allowed
subjectsstring[][]Bound Subject list; wildcards supported
storagestringfileStorage type: file / memory
retentionstringlimitsRetention policy: limits / workqueue / interest
max_msgsnumber-1Max message count; -1 = unlimited
max_bytesnumber-1Max storage bytes; -1 = unlimited
max_agenumber0Max message retention duration (nanoseconds); 0 = unlimited
max_msg_sizenumber-1Max single message size (bytes); -1 = unlimited
max_msgs_per_subjectnumber-1Max messages per Subject; -1 = unlimited
num_replicasnumber1Replica count (cluster mode)
descriptionstring""Description
discardstring""Discard policy when limit exceeded: old / new
duplicate_windownumber?Deduplication window duration (nanoseconds)
deny_deleteboolfalseProhibit deleting individual messages
deny_purgeboolfalseProhibit purging the Stream
allow_rollup_hdrsboolfalseAllow KV rollup headers (KV Store specific)

Response fields (StreamInfoResponse):

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_create_response
configobjectThe resulting StreamConfig; same structure as request fields
state.messagesnumberTotal message count
state.bytesnumberTotal storage bytes
state.first_seqnumberSequence number of the first message
state.last_seqnumberSequence number of the last message
state.consumer_countnumberCurrent Consumer count
createdstringCreation time (RFC3339)
cluster.namestring?Cluster name
cluster.leaderstring?Current leader node name
cluster.replicasarray?Replica node list
errorobject?Present on error

$JS.API.STREAM.UPDATE.<stream>

Update Stream configuration. Request and response format are identical to STREAM.CREATE.

$JS.API.STREAM.INFO.<stream>

Query Stream information. Request payload is empty; response format is the same as STREAM.CREATE.

$JS.API.STREAM.DELETE.<stream>

Delete a Stream along with all its messages and Consumers. Request payload is empty.

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_delete_response
successboolWhether deletion succeeded
errorobject?Present on error

$JS.API.STREAM.LIST

List full information for all Streams.

Request fields:

FieldTypeRequiredDefaultDescription
offsetnumber?0Pagination offset

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_list_response
totalnumberTotal Stream count
offsetnumberCurrent page offset
limitnumberMax items per page
streamsarrayList of StreamInfo; same structure as STREAM.CREATE response
errorobject?Present on error

$JS.API.STREAM.NAMES

List all Stream names only (no full info). Request fields same as STREAM.LIST.

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_names_response
totalnumberTotal Stream count
offsetnumberCurrent page offset
limitnumberMax items per page
streamsstring[]List of Stream names
errorobject?Present on error

$JS.API.STREAM.PURGE.<stream>

Purge messages from a Stream, with optional Subject or sequence filtering.

Request fields:

FieldTypeRequiredDescription
filterstring?Only purge messages matching this Subject
seqnumber?Only purge messages with sequence number less than this value
keepnumber?Keep the most recent N messages; purge the rest

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_purge_response
successboolWhether the operation succeeded
purgednumberActual number of messages deleted
errorobject?Present on error

$JS.API.STREAM.MSG.GET.<stream>

Fetch a specific message from a Stream by sequence number or Subject without updating any consumption progress.

Request fields (choose one):

FieldTypeDescription
seqnumber?Fetch by global sequence number
last_by_subjstring?Fetch the last message for a Subject
next_by_subjstring?Fetch the next message for a Subject after the current position

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_msg_get_response
message.subjectstringMessage Subject
message.seqnumberGlobal sequence number in the Stream
message.datastringMessage payload (base64 encoded)
message.timestringPublish time (RFC3339)
message.headersobject?Message headers (key-value pairs)
errorobject?Present on error

$JS.API.STREAM.MSG.DELETE.<stream>

Delete a specific message from a Stream.

Request fields:

FieldTypeRequiredDefaultDescription
seqnumberSequence number of the message to delete
no_eraseboolfalsetrue = mark as deleted only; false = overwrite with random data

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_msg_delete_response
successboolWhether deletion succeeded
errorobject?Present on error

$JS.API.STREAM.SNAPSHOT.<stream>

Create a Stream snapshot; the server streams data to deliver_subject.

Request fields:

FieldTypeRequiredDefaultDescription
deliver_subjectstringSubject to receive snapshot data; client must SUB first
no_consumersboolfalsetrue = exclude Consumer info from snapshot
check_msgsboolfalsetrue = verify message integrity before snapshotting

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_snapshot_response
successboolWhether the snapshot was successfully started
errorobject?Present on error

$JS.API.STREAM.RESTORE.<stream>

Restore a Stream from a snapshot. Request payload is empty; after the response the client pushes snapshot data to the server. Response format same as STREAM.SNAPSHOT.

$JS.API.STREAM.LEADER.STEPDOWN.<stream>

Trigger the current Stream's Raft leader to step down. Request payload is empty.

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.stream_leader_stepdown_response
successboolWhether the stepdown was triggered
errorobject?Present on error

$JS.API.STREAM.PEER.REMOVE.<stream>

Remove a peer node from the Stream's replica set.

Request fields:

FieldTypeRequiredDescription
peerstringName of the peer node to remove

Response format same as STREAM.LEADER.STEPDOWN.


Consumer Management

Consumer management interaction diagram

$JS.API.CONSUMER.CREATE.<stream> / $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer>

Create a Consumer. DURABLE.CREATE creates a durable Consumer; CREATE creates an ephemeral Consumer.

Request fields:

FieldTypeRequiredDescription
stream_namestringTarget Stream name
actionstring?create (only create) / update (only update) / empty (create or update)
config.durable_namestring?Durable Consumer name; omit for ephemeral
config.namestring?Consumer name (new style, NATS 2.10+)
config.descriptionstring?Description
config.deliver_subjectstring?Push Subject for push Consumer; omit for pull Consumer
config.deliver_policystringDeliver policy; default all
config.ack_policystringACK policy; default explicit
config.ack_waitnumber?ACK timeout (nanoseconds); unACKed messages are redelivered after this
config.max_delivernumber?Max redelivery attempts; -1 = unlimited
config.replay_policystringReplay policy; default instant
config.filter_subjectstring?Only consume messages matching this Subject (single)
config.filter_subjectsstring[]?Only consume messages matching these Subjects (multiple)
config.opt_start_seqnumber?Start sequence when deliver_policy is by_start_sequence
config.opt_start_timestring?Start time when deliver_policy is by_start_time (RFC3339)
config.max_waitingnumber?Max concurrent pull requests (pull Consumer)
config.max_ack_pendingnumber?Max pending ACK messages; -1 = unlimited
config.flow_controlboolEnable flow control (push Consumer)
config.idle_heartbeatnumber?Idle heartbeat interval (nanoseconds, push Consumer)
config.backoffnumber[]?Redelivery backoff sequence (nanoseconds)
config.pause_untilstring?Pause delivery until this time (RFC3339)

Response fields (ConsumerInfoResponse):

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.consumer_create_response
stream_namestringParent Stream name
namestringConsumer name
configobjectConsumer configuration; same structure as request config
createdstringCreation time (RFC3339)
delivered.consumer_seqnumberConsumer sequence of the last delivered message
delivered.stream_seqnumberStream sequence of the last delivered message
ack_floor.consumer_seqnumberLowest Consumer sequence of ACKed messages
ack_floor.stream_seqnumberLowest Stream sequence of ACKed messages
num_ack_pendingnumberNumber of messages pending ACK
num_redeliverednumberNumber of redelivered messages
num_waitingnumberNumber of waiting pull requests
num_pendingnumberNumber of messages not yet delivered
clusterobject?Cluster info
pausedbool?Whether delivery is paused
pause_remainingnumber?Remaining pause duration (nanoseconds)
errorobject?Present on error

$JS.API.CONSUMER.INFO.<stream>.<consumer>

Query Consumer information. Request payload is empty; response format same as CONSUMER.CREATE.

$JS.API.CONSUMER.DELETE.<stream>.<consumer>

Delete a Consumer. Request payload is empty.

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.consumer_delete_response
successboolWhether deletion succeeded
errorobject?Present on error

$JS.API.CONSUMER.LIST.<stream>

List full information for all Consumers on a Stream. Request fields same as STREAM.LIST (pagination).

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.consumer_list_response
totalnumberTotal Consumer count
offsetnumberCurrent page offset
limitnumberMax items per page
consumersarrayList of ConsumerInfo; same structure as CONSUMER.CREATE response
errorobject?Present on error

$JS.API.CONSUMER.NAMES.<stream>

List all Consumer names on a Stream. Request fields same as STREAM.LIST (pagination).

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.consumer_names_response
totalnumberTotal Consumer count
offsetnumberCurrent page offset
limitnumberMax items per page
consumersstring[]List of Consumer names
errorobject?Present on error

$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>

Pull Consumer fetch. The server delivers messages directly to the request's reply-to address; each message's reply-to field is the ACK address.

Request fields:

FieldTypeRequiredDefaultDescription
batchnumber1Max messages to pull at once
expiresnumber?Wait timeout (nanoseconds); server returns 408 when no messages available
max_bytesnumber?Max total bytes to return in one fetch
no_waitboolfalsetrue = return immediately when no messages are available
idle_heartbeatnumber?Heartbeat interval while waiting (nanoseconds)

Messages are delivered as standard MSG with no JSON body. Status codes returned on timeout or error:

StatusDescription
408Request timed out; no messages available
409Consumer has expired or been deleted

$JS.API.CONSUMER.LEADER.STEPDOWN.<stream>.<consumer>

Trigger the Consumer's Raft leader to step down. Request payload is empty.

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.consumer_leader_stepdown_response
successboolWhether the stepdown was triggered
errorobject?Present on error

$JS.API.CONSUMER.PAUSE.<stream>.<consumer>

Pause or resume Consumer message delivery.

Request fields:

FieldTypeRequiredDescription
pause_untilstring?Pause until this time (RFC3339); omit to resume immediately

Response fields:

FieldTypeDescription
typestringFixed value io.nats.jetstream.api.v1.consumer_pause_response
pausedboolWhether delivery is currently paused
pause_untilstring?Pause expiry time; absent when not paused
errorobject?Present on error

Message Consumption and Publishing

Message Delivery (Push Consumer)

Each message delivered by the server has the ACK address in the reply-to field:

text
MSG mycons.delivery 1 $JS.ACK.mystream.mycons.1.5.5.1700000000.0 13\r\n
Hello JetStream\r\n

Publish with ACK (Publish ACK)

A normal PUB has no persistence confirmation. Publishing with a reply-to causes the server to reply after the message is written to the Stream.

Response fields:

FieldTypeDescription
streamstringName of the Stream the message was written to
seqnumberGlobal sequence number of the message in the Stream
duplicateboolWhether the message is a duplicate (same Nats-Msg-Id within the deduplication window)

For deduplicated publishing, include Nats-Msg-Id in the Header:

text
HPUB orders.new _INBOX.1 44 68\r\n
NATS/1.0\r\n
Nats-Msg-Id: order-123\r\n
\r\n
{"item":"widget","qty":5}\r\n

ACK Mechanism

ACK interaction diagram

ACK address format:

text
$JS.ACK.<stream>.<consumer>.<delivered_count>.<stream_seq>.<consumer_seq>.<timestamp>.<pending_msgs>

ACK address field meanings:

FieldDescription
delivered_countNumber of times this message has been delivered (increments on redelivery)
stream_seqGlobal sequence number of the message in the Stream
consumer_seqSequence number of the message within the Consumer
timestampDelivery timestamp (nanoseconds)
pending_msgsNumber of messages currently pending for this Consumer

ACK operations (PUB different payloads to the ACK address):

PayloadTypeRequest FieldsDescription
Empty or +ACKAcknoneMessage processed successfully
-NAKNakdelay (number?, nanoseconds, optional)Reject; redeliver immediately or after delay
-WPIProgressnoneProcessing in progress; reset ack_wait timer to prevent redelivery
-NXTNextbatch (number, default 1)Ack and immediately request the next message (pull Consumer only)
-TERMTermnoneTerminate; no redelivery

Direct Get

Read a message directly from a Stream, bypassing the Consumer layer without updating any consumption progress. Suited for random access. The response is in HMSG format with metadata in headers and the raw message content as the body.

$JS.API.DIRECT.GET.<stream>

Request fields (choose one):

FieldTypeDescription
seqnumber?Fetch by global sequence number
last_by_subjstring?Fetch the last message for a Subject
next_by_subjstring?Fetch the next message for a Subject after a specified position
start_timestring?Fetch the first message after a specified time (RFC3339)

Response HMSG headers:

HeaderDescription
Nats-StreamStream name
Nats-SequenceGlobal sequence number of the message
Nats-SubjectSubject of the message
Nats-Time-StampPublish time (RFC3339)
Nats-Num-PendingNumber of additional messages remaining (optional)

$JS.API.DIRECT.GET.<stream>.<subject>

Fetch the latest message for the specified Subject. Request payload is empty; response format same as above.

$JS.API.DIRECT.GET.LAST.<stream>.<subject>

Alias for DIRECT.GET.<stream>.<subject> with the same semantics; response format same as above.


Advisory Events

The server publishes advisory events to $JS.EVENT.ADVISORY.* when Stream and Consumer state changes occur (fire-and-forget, no reply-to). Clients subscribe to these Subjects to monitor cluster state.

Common payload fields for all events:

FieldTypeDescription
typestringEvent type identifier, e.g. io.nats.jetstream.advisory.v1.stream_action
idstringUnique event ID (UUID)
timestampstringEvent time (RFC3339)
streamstringRelated Stream name (Stream-type events)
consumerstring?Related Consumer name (Consumer-type events)
actionstring?Operation type, e.g. create / delete / update
client.accstring?Account name of the operation initiator
client.userstring?User name of the operation initiator

Complete event list:

EventSubjectTriggered When
Stream created$JS.EVENT.ADVISORY.STREAM.CREATED.<stream>A Stream is created
Stream deleted$JS.EVENT.ADVISORY.STREAM.DELETED.<stream>A Stream is deleted
Stream updated$JS.EVENT.ADVISORY.STREAM.UPDATED.<stream>Stream config changes
Stream leader elected$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.<stream>Leader changes in cluster mode
Consumer created$JS.EVENT.ADVISORY.CONSUMER.CREATED.<stream>.<consumer>A Consumer is created
Consumer deleted$JS.EVENT.ADVISORY.CONSUMER.DELETED.<stream>.<consumer>A Consumer is deleted
Consumer leader elected$JS.EVENT.ADVISORY.CONSUMER.LEADER_ELECTED.<stream>.<consumer>Consumer leader changes in cluster mode
API audit$JS.EVENT.ADVISORY.APIAfter every API call; records operation log
Snapshot started$JS.EVENT.ADVISORY.STREAM.SNAPSHOT.CREATE.<stream>A snapshot task starts
Snapshot completed$JS.EVENT.ADVISORY.STREAM.SNAPSHOT.COMPLETE.<stream>A snapshot task completes
Restore started$JS.EVENT.ADVISORY.STREAM.RESTORE.CREATE.<stream>A restore task starts
Restore completed$JS.EVENT.ADVISORY.STREAM.RESTORE.COMPLETE.<stream>A restore task completes

KV Store

KV Store is a key-value store implemented on JetStream Streams. The underlying Stream is named KV_<bucket>. Each write to a key is a message with Subject $KV.<bucket>.<key>; versions are managed by sequence number.

Bucket Management

Creating and deleting buckets are Stream operations on the underlying KV_<bucket> Stream. Use $JS.API.STREAM.CREATE.KV_<bucket> with subjects: ["$KV.<bucket>.>"], max_msgs_per_subject: 1 (keep only the latest value), and allow_rollup_hdrs: true.

KV Put

Publish a message to $KV.<bucket>.<key>. Include a reply-to to receive a write confirmation.

Response fields (KvPutResponse):

FieldTypeDescription
streamstringUnderlying Stream name (KV_<bucket>)
seqnumberSequence number after write (the latest version number of the key)
duplicateboolWhether this is a duplicate write

KV Get

Read the latest value for a key using Direct Get: $JS.API.DIRECT.GET.LAST.KV_<bucket>.$KV.<bucket>.<key>.

Response is HMSG with the following headers:

HeaderDescription
Nats-StreamUnderlying Stream name
Nats-SubjectFull Subject ($KV.<bucket>.<key>)
Nats-SequenceCurrent version sequence number
Nats-Time-StampWrite time
KV-OperationEmpty = PUT, DEL = deleted, PURGE = purged
Nats-Num-PendingNumber of historical revisions of the key

KV Delete

Publish an empty message with KV-Operation: DEL header to $KV.<bucket>.<key> to mark the key as deleted. Write confirmation same as Put.

KV Purge

Publish an empty message with KV-Operation: PURGE header to $KV.<bucket>.<key> to remove all historical revisions of the key. Write confirmation same as Put.

KV Keys (List All Keys)

Subscribe to $KV.<bucket>.> and consume via a Consumer, filtering out KV-Operation: DEL and PURGE entries to obtain the list of currently live keys. Delivered as a stream of push messages; no unified JSON response body.

KV Watch

Create a push Consumer on $KV.<bucket>.<key|>. Receive a push message in KV Get format each time a key changes.


Object Store

Object Store is an object storage implementation built on JetStream Streams. The underlying Stream is named OBJ_<bucket>. It supports storing arbitrarily large binary objects via chunked upload. Each object consists of two parts:

  • Metadata: stored at the $OBJ.<bucket>.info.<object> Subject
  • Data chunks: stored at the $OBJ.<bucket>.chunks.<nonce> Subject

Object Bucket Management

Use $JS.API.STREAM.CREATE.OBJ_<bucket> with subjects: ["$OBJ.<bucket>.>"].

Upload an Object (Put)

Upload is a two-step process: send metadata first, then send data in chunks.

Metadata fields (ObjectMeta, published to $OBJ.<bucket>.info.<object>):

FieldTypeRequiredDescription
namestringObject name
descriptionstringDescription
noncestringUnique random identifier; data chunks are published to $OBJ.<bucket>.chunks.<nonce>
bucketstringBucket name
chunksnumberTotal number of chunks
sizenumberTotal object size (bytes)
headersobject?Custom headers
optionsobject?Extended options

Chunks are published sequentially to $OBJ.<bucket>.chunks.<nonce>; the server assembles the object automatically after the last chunk.

Response fields after upload completes (ObjectInfo):

FieldTypeDescription
namestringObject name
descriptionstringDescription
noncestringUnique identifier
bucketstringBucket name
chunksnumberTotal chunk count
sizenumberTotal size (bytes)
digeststringContent digest (sha-256=...)
deletedboolWhether the object has been deleted
headersobject?Custom headers

Download an Object (Get)

Send a request to $OBJ.<bucket>; the server streams chunks to deliver_subject.

Request fields:

FieldTypeRequiredDescription
namestringObject name
deliver_subjectstringSubject to receive data chunks; client must SUB first

No JSON response body; data is pushed as standard MSG chunks to deliver_subject.

Get Object Metadata (Info)

Send a request to $OBJ.<bucket>; returns ObjectInfo with the same fields as the post-upload response.

Request fields:

FieldTypeRequiredDescription
namestringObject name

Delete an Object

Send a request to $OBJ.<bucket>.

Request fields:

FieldTypeRequiredDescription
namestringName of the object to delete

Response fields:

FieldTypeDescription
successboolWhether deletion succeeded

List All Objects

Send a request to $OBJ.<bucket> with an empty payload.

Response fields (ObjectListResponse):

FieldTypeDescription
bucketstringBucket name
objectsarrayList of ObjectInfo; same structure as post-upload response

Watch for Changes

Subscribe to $OBJ.<bucket>.info.>. Receive an ObjectInfo push each time an object is uploaded or deleted (deleted: true indicates deletion).


JetStream Domain

When the server is configured with a JetStream Domain, the API Subject prefix changes from $JS.API to $JS.<domain>.API:

text
# Default
PUB $JS.API.STREAM.INFO.mystream _INBOX.1 0

# With specified domain
PUB $JS.hub.API.STREAM.INFO.mystream _INBOX.1 0

Relationship with Core NATS

JetStream introduces no new wire protocol commands; protocol parsers require no modifications:

  • Messages published via ordinary PUB, if they match a Stream's Subject, are automatically captured and persisted — publishers do not need to be aware of JetStream
  • To receive a Publish ACK, simply include a reply-to in the PUB
  • Messages received by Consumers are standard MSG or HMSG; the reply-to field is the ACK address
  • All management operations are JSON requests sent to $JS.API.* Subjects

All the work of implementing JetStream compatibility is at the application layer: recognizing specific Subject prefixes, parsing JSON request bodies, executing the corresponding operations, and serializing results into JSON to reply via MSG.


Complete API Subject Reference

Stream Operations

OperationSubject
Create$JS.API.STREAM.CREATE.<stream>
Update$JS.API.STREAM.UPDATE.<stream>
Query info$JS.API.STREAM.INFO.<stream>
List all$JS.API.STREAM.LIST
List names$JS.API.STREAM.NAMES
Delete$JS.API.STREAM.DELETE.<stream>
Purge messages$JS.API.STREAM.PURGE.<stream>
Get message$JS.API.STREAM.MSG.GET.<stream>
Delete message$JS.API.STREAM.MSG.DELETE.<stream>
Snapshot$JS.API.STREAM.SNAPSHOT.<stream>
Restore$JS.API.STREAM.RESTORE.<stream>
Step down leader$JS.API.STREAM.LEADER.STEPDOWN.<stream>
Remove peer$JS.API.STREAM.PEER.REMOVE.<stream>

Consumer Operations

OperationSubject
Create ephemeral$JS.API.CONSUMER.CREATE.<stream>
Create named$JS.API.CONSUMER.CREATE.<stream>.<consumer>
Create durable$JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer>
Query info$JS.API.CONSUMER.INFO.<stream>.<consumer>
List all$JS.API.CONSUMER.LIST.<stream>
List names$JS.API.CONSUMER.NAMES.<stream>
Delete$JS.API.CONSUMER.DELETE.<stream>.<consumer>
Pull fetch$JS.API.CONSUMER.MSG.NEXT.<stream>.<consumer>
Step down leader$JS.API.CONSUMER.LEADER.STEPDOWN.<stream>.<consumer>
Pause$JS.API.CONSUMER.PAUSE.<stream>.<consumer>

Direct Get Operations

OperationSubject
Get by request body$JS.API.DIRECT.GET.<stream>
Get by subject$JS.API.DIRECT.GET.<stream>.<subject>
Get last by subject$JS.API.DIRECT.GET.LAST.<stream>.<subject>

Other

OperationSubject
Server info$JS.API.INFO
Account info$JS.API.ACCOUNT.INFO
ACK address$JS.ACK.<stream>.<consumer>.<...>
Flow control$JS.FC.<stream>.>

References

🎉 既然都登录了 GitHub,不如顺手给我们点个 Star 吧!⭐ 你的支持是我们最大的动力 🚀