快速开始
本指南使用 NATS CLI 带你完整体验 mq9 的核心操作,连接公共演示服务器。无需账号、无需配置、无需 SDK——只需一个终端。
准备工作
安装 NATS CLI,这是与 mq9 交互唯一需要的工具。
连接公共服务器
RobustMQ 演示服务器地址:
nats://demo.robustmq.com:4222这是共享环境,任何知道 subject 名称的人都能订阅,请勿发送敏感数据。以下示例均连接此服务器——在每条命令中加上 -s nats://demo.robustmq.com:4222,或一次性设置环境变量:
export NATS_URL=nats://demo.robustmq.com:4222创建邮箱
邮箱是 mq9 的基本通信地址。使用 nats req(请求/回复)创建邮箱,服务端通过 NATS reply-to 返回分配的 mail_id:
nats req '$mq9.AI.MAILBOX.CREATE' '{"ttl":60}'响应:
{"mail_id":"m-7f3a1c9e2b"}mail_id 是唯一的访问凭证。任何知道它的人都能向这个邮箱发消息或订阅。私有通信场景下请妥善保管。
此处 TTL 设为 60 秒仅供演示方便。生产环境中请根据任务的预期生命周期选择合适的 TTL——TTL 到期后邮箱及其所有消息自动销毁,无需手动清理。
按优先级发送消息
mq9 支持三个优先级:critical、urgent、normal(默认,无后缀)。发送消息的 subject 格式为:
$mq9.AI.MAILBOX.MSG.{mail_id}.{priority} # urgent 或 critical
$mq9.AI.MAILBOX.MSG.{mail_id} # 默认(normal),无后缀将 m-7f3a1c9e2b 替换为上一步拿到的 mail_id:
# 最高优先级——立即处理;适用于中止信号、紧急指令、安全事件
nats pub '$mq9.AI.MAILBOX.MSG.m-7f3a1c9e2b.critical' '{"type":"abort","task_id":"t-001"}'
# 紧急——适用于任务中断、时效性指令
nats pub '$mq9.AI.MAILBOX.MSG.m-7f3a1c9e2b.urgent' '{"type":"interrupt","task_id":"t-002"}'
# 默认优先级(normal)——常规通信;适用于任务分发、结果返回
nats pub '$mq9.AI.MAILBOX.MSG.m-7f3a1c9e2b' '{"type":"task","payload":"process dataset A"}'发送是即发即忘(nats pub)。服务端立即将每条消息写入存储,发送方无需等待订阅者在线。
禁止通配符发布。 Subject
$mq9.AI.MAILBOX.MSG.*.*会被服务端拒绝,发送时必须指定精确的mail_id。
订阅接收消息
订阅邮箱消息:
# 订阅所有优先级(critical、urgent、normal)
nats sub '$mq9.AI.MAILBOX.MSG.m-7f3a1c9e2b.*'上面发送的消息会在订阅后立即推送——无论订阅者发送时是否在线。这是 mq9 的先存储后推送语义:每次订阅都会先按优先级顺序(critical → urgent → normal,同级 FIFO)推送所有未过期的存储消息,然后切换到实时推送新消息。
这意味着订阅先于还是晚于消息发送,结果是一样的。Agent 网络中断后重连,会自动收到所有错过的消息。
只订阅某一优先级:
nats sub '$mq9.AI.MAILBOX.MSG.m-7f3a1c9e2b.urgent'
nats sub '$mq9.AI.MAILBOX.MSG.m-7f3a1c9e2b.critical'列出消息元数据
不消费消息,只查看邮箱中当前存储的消息——向 LIST subject 发送请求:
nats req '$mq9.AI.MAILBOX.LIST.m-7f3a1c9e2b' '{}'响应:
{
"messages": [
{"msg_id": "msg-001", "priority": "critical", "ts": 1712600001},
{"msg_id": "msg-002", "priority": "urgent", "ts": 1712600002},
{"msg_id": "msg-003", "priority": "normal", "ts": 1712600003}
]
}响应返回每条消息的 msg_id、priority 和 ts(Unix 时间戳)。不包含消息体——LIST 用于检视,不用于检索。使用 msg_id 可对特定消息执行删除操作。
删除消息
在邮箱 TTL 到期前删除某条特定消息:
nats req '$mq9.AI.MAILBOX.DELETE.m-7f3a1c9e2b.msg-002' '{}'竞争消费场景下,Worker 完成任务后可以显式删除任务消息来确认完成。Subject 格式为:
$mq9.AI.MAILBOX.DELETE.{mail_id}.{msg_id}创建公开邮箱
公开邮箱的 mail_id 由用户自定义——你选择的名字就是地址。适用于多方需要共同发现地址、无需带外协调的共享任务队列或能力公告。
nats req '$mq9.AI.MAILBOX.CREATE' '{
"ttl": 3600,
"public": true,
"name": "task.queue",
"desc": "共享 Worker 任务队列"
}'响应:
{"mail_id":"task.queue"}mail_id 就是你提供的 name。邮箱自动注册到 $mq9.AI.PUBLIC.LIST,任何订阅该系统地址的 Agent 都可以发现它。TTL 到期后自动从列表中移除。
CREATE 是幂等的:如果名为 task.queue 的邮箱已存在,此调用返回成功但不重置 TTL。Worker 启动时可以安全地调用它而不必担心覆盖已有邮箱。
队列组(竞争消费)
当多个 Worker 以相同队列组名订阅同一邮箱时,mq9 将每条消息只投递给其中一个 Worker。打开两个终端,各运行相同命令:
终端 1:
nats sub '$mq9.AI.MAILBOX.MSG.task.queue.*' --queue workers终端 2:
nats sub '$mq9.AI.MAILBOX.MSG.task.queue.*' --queue workers发送几条消息:
nats pub '$mq9.AI.MAILBOX.MSG.task.queue' '{"task":"job-1"}'
nats pub '$mq9.AI.MAILBOX.MSG.task.queue' '{"task":"job-2"}'
nats pub '$mq9.AI.MAILBOX.MSG.task.queue' '{"task":"job-3"}'每条消息只会出现在一个终端——Broker 将它们分配到队列组中。Worker 可以随时加入或退出,队列自动调整,无需任何配置变更。
由于 mq9 使用先存储后推送,如果 Worker 在删除任务消息之前崩溃,消息仍保留在存储中,当任意组成员重连时会重新投递。将公开邮箱与队列组结合,即可实现零配置、容灾容错的任务队列。
