核心功能
1. 邮箱类型
mq9 提供两种邮箱类型,在创建时选定,无法更改。
| 私有邮箱 | 公开邮箱 | |
|---|---|---|
mail_id | 服务端生成的 UUID(不可猜测) | 用户自定义字符串(如 task.queue) |
| 可发现性 | 不可发现——只有已知 mail_id 的 Agent 才能交互 | 自动注册到 $mq9.AI.PUBLIC.LIST,任何人可发现 |
| 适用场景 | 点对点通信、任务结果返回、私有回复 | 任务队列、广播频道、能力公告 |
安全模型: mail_id 的不可猜测性是唯一的访问控制边界。没有 bearer token,没有 ACL,没有认证头。知道 mail_id 的 Agent 即可发送消息和订阅;不知道的 Agent 无法与邮箱进行任何交互。私有邮箱应将 mail_id 视为只与预期参与方共享的密钥。
公开邮箱以可发现性换取透明性,名字即地址——选择有意义、能自描述的名字(如 vision.results、task.queue),而不是晦涩的 UUID。
2. 优先级系统
每条消息发送到三个优先级之一:critical、urgent、normal(默认,无后缀)。优先级编码在 subject 中:
$mq9.AI.MAILBOX.MSG.{mail_id}.critical # 最高优先级
$mq9.AI.MAILBOX.MSG.{mail_id}.urgent # 紧急
$mq9.AI.MAILBOX.MSG.{mail_id} # 默认(normal),无后缀排序保证:
- 同优先级内:FIFO——消息按发送顺序投递。
- 跨优先级:
critical先于urgent,urgent先于normal。排序由存储层保证,订阅者无需客户端侧排序。
各优先级的存储后端:
| 优先级 | 存储后端 | 持久化 |
|---|---|---|
critical | RocksDB | 持久化——Broker 重启后仍存在 |
urgent | RocksDB | 持久化——Broker 重启后仍存在 |
normal(默认) | Memory | 不持久化——Broker 重启后丢失,如需要可重发 |
实用建议:
critical— 中止信号、紧急指令、安全事件。这些消息必须最先到达,不能丢失。urgent— 任务中断、时效性指令。这些消息需要优先处理,不能丢失。normal(默认)— 任务分发、结果返回、审批请求。Agent 间通信的默认级别,重启后丢失可接受。
3. 先存储后推送
mq9 的投递模型与标准 pub/sub 不同。每条消息到达后的处理序列:
- 消息到达 Broker。
- 写入存储(RocksDB 或 Memory,取决于优先级)。
- 如果有订阅者当前在线:同时实时推送。
- 如果没有订阅者在线:消息等待在存储中。
- 订阅者连接时:立即按优先级顺序推送所有未过期的存储消息,然后继续实时投递。
这有两个重要的结果:
离线 Agent 不会丢失消息。 正在重启、更新或临时不可达的 Agent,在重连并订阅后立即收到缺席期间的所有消息。无需轮询,无需重放请求,无需额外恢复逻辑。
订阅等同于查询。 没有单独的 QUERY 或 FETCH 命令。每次订阅都会推送所有未过期的存储消息。服务端不追踪已读/未读状态,也不维护每个消费者的位点。从新连接再次订阅会从头重放所有未过期消息。
与相关系统的对比:
| 系统 | 持久化 | 消费者状态 | 重放模型 |
|---|---|---|---|
| NATS Core | 无——离线消息丢失 | 无 | 无 |
| NATS JetStream | 完整流式持久化 | 每消费者维护 offset、消费者组、ACK | 从任意 offset 可配置重放 |
| mq9 | 每邮箱 TTL 限定持久化 | 无 | 每次订阅推送所有未过期消息 |
mq9 位于 Core 和 JetStream 之间。它增加了足够的持久化来处理 Agent 离线的情况,而不引入 JetStream 所需的 stream、consumer、offset 和 ACK 机制。代价是 mq9 没有"从上次离开的地方继续"的语义——详见无服务端消费者状态。
4. TTL 与生命周期
TTL(生存时间)是邮箱唯一的生命周期机制,在创建时声明:
{"ttl": 3600, "public": false}行为:
- TTL 从邮箱创建时开始计时。
- TTL 到期时:邮箱自动销毁,所有存储的消息随之清理。无需手动操作。
- 没有删除邮箱的命令。预期用法是:为任务创建邮箱,使用,然后忽略——TTL 负责清理。
- TTL 在创建后不可更改,由第一次成功的
MAILBOX.CREATE调用固定。
CREATE 是幂等的。 对已存在的邮箱名(适用于公开邮箱)再次调用 MAILBOX.CREATE,服务端返回成功,不修改现有邮箱,也不重置 TTL。这一设计支持以下模式:
- 发送前确保存在: Worker 在启动时调用 CREATE 确保邮箱存在。如果邮箱由之前的 Worker 创建,此次调用是空操作。Worker 之间无需协调。
- 断开重连恢复: 重连的 Agent 可以重新调用 CREATE,无风险覆盖活跃邮箱或其中的消息。
- 多个生产者: 向同一公开邮箱发送消息的多个 Agent 可以各自独立调用 CREATE。第一个创建,其余是空操作,所有生产者无需知道谁"拥有"该邮箱。
注意幂等性针对邮箱身份,不针对 TTL 值。第二次 CREATE 请求中指定了不同的 TTL,原始 TTL 仍然保留。要更改 TTL,让邮箱过期后重新创建。
5. 竞争消费(队列组)
多个订阅者可以通过加入队列组来竞争同一邮箱的消息。每条消息只投递给组中的一个成员。
# Worker 1(订阅所有优先级消息)
nats sub '$mq9.AI.MAILBOX.MSG.task.queue.*' --queue workers
# Worker 2
nats sub '$mq9.AI.MAILBOX.MSG.task.queue.*' --queue workers使用相同队列组名(上面的 workers)的所有订阅者共享消息投递。Broker 通过负载均衡将每条消息路由到一个成员。组名任意——任何字符串都有效。
动态成员: Worker 可以随时加入或退出队列组,Broker 立即调整路由,无需任何配置变更或协调器介入。
容灾容错: 由于 mq9 使用先存储后推送,投递动作本身不会从存储中删除消息。如果 Worker 收到消息后崩溃,消息仍留在存储中,当任意组成员重连时会重新投递。需要至少一次处理的 Worker 应在成功完成后显式删除($mq9.AI.MAILBOX.DELETE.{mail_id}.{msg_id})消息。
推荐模式: 将公开邮箱与队列组结合,构建零配置分布式任务队列。邮箱名作为队列地址,可通过 $mq9.AI.PUBLIC.LIST 发现。Worker 启动时用 --queue 订阅。无需队列配置,无需 Broker 侧消费者组定义,无需协调器。
6. 幂等创建
对同一邮箱名多次调用 MAILBOX.CREATE 是安全的。行为:
- 邮箱不存在:以指定 TTL 创建。
- 邮箱已存在:返回成功,保留原始 TTL,不重置。
这一特性使 CREATE 在以下场景中可以安全使用:
- Worker 启动初始化: 每个 Worker 实例在启动时为其公开任务队列邮箱调用 CREATE。只有第一次调用会创建,后续调用是空操作。Worker 之间无需协调。
- 断开后重连: 重连的 Agent 可以重新 CREATE 其邮箱,不会丢失存储的消息或缩短 TTL。
- 多个生产者: 向同一公开邮箱发送消息的多个 Agent 可以在发送前各自独立调用 CREATE。第一个创建,其余观察到空操作。所有生产者无需知道谁"拥有"邮箱即可继续。
注意幂等性针对邮箱身份,不针对 TTL 值。第二次 CREATE 指定了不同的 TTL,原始 TTL 仍然保留。要更改 TTL,让邮箱过期后重新创建新的。
7. 无服务端消费者状态
mq9 服务端追踪零消费者状态。没有 offset,没有消费者组,没有 ACK 序列,没有"最后投递"指针。
实际含义:
- 每次订阅都会从邮箱历史记录的开始推送所有未过期消息。
- 两个独立连接订阅同一邮箱,都会收到完整的消息集。
- 没有办法订阅并只接收"新"消息——每个订阅者都从最旧的未过期消息开始。
- 取消订阅后重新订阅,会再次重放所有未过期消息。
权衡: 协议显著更简单。没有消费者位置的协商,没有 ACK 流,没有随消费者数量增长的服务端记账。这与目标工作负载匹配:AI Agent 通常在唤醒时处理邮箱中的所有消息,而不是需要从特定位置恢复。
去重: 如果订阅者必须避免跨重连重复处理同一消息,使用每条消息的 msg_id 字段。应用程序追踪已处理的 msg_id;重连后跳过已知 msg_id 的消息。服务端在每条投递的消息中提供 msg_id;追踪责任在客户端。
这是 JetStream 消费者 offset 模型的显式对照:服务端更简单,需要去重的客户端需要做更多工作。
