跳转到内容

RocketMQ on Kubernetes

30 道题
分类
中间件
题目数
30 道
已阅读 0 / 30 题
1 RocketMQ 的核心架构包含哪些组件,各自承担什么职责

答案:

RocketMQ 核心架构由 NameServer、Broker(Master/Slave)、Producer、Consumer 四类组件构成,其中 NameServer 承担路由发现与注册中心职责,Broker 负责消息存储与投递,Producer 生产消息,Consumer 消费消息。

[分层展开]

  • NameServer:无状态轻量级路由注册中心;集群内各节点独立运行、互不通信;Broker 启动时向所有 NameServer 注册 Topic 路由信息;Producer 与 Consumer 从 NameServer 获取 Broker 地址列表;Broker 通过心跳(30 秒)保持路由注册有效。
  • Broker:消息存储与投递核心节点;分为 Master 与 Slave 角色;Master 处理读写请求,Slave 仅提供读服务;支持同步/异步刷盘、同步/异步主从复制;Broker 集群之间不直接通信。
  • Producer:消息生产者,与 NameServer 中随机一个节点建立长连接获取路由;与 Broker Master 建立连接发送消息;支持同步/异步/单向三种发送方式。
  • Consumer:消息消费者,与 NameServer 建立连接获取路由;通过 Pull 或 Pop 模式从 Broker 拉取消息;支持集群消费(Clustering)与广播消费(Broadcasting);Consumer Group 内实例共同分摊消费。

架构示意

graph TD
    subgraph 路由层["路由层"]
        NS1["NameServer-1"]
        NS2["NameServer-2"]
    end

    subgraph 客户端["客户端"]
        P["Producer"]
        C["Consumer"]
    end

    subgraph 存储层["存储层"]
        BM["Broker Master"] <-->|"复制"| BS["Broker Slave"]
    end

    NS1 & NS2 --> P
    NS1 & NS2 --> C
    P & C --> BM
    BM --> BS
2 RocketMQ 的 Topic 与 Message Queue 的设计原理是什么

答案:

Topic 是消息的逻辑分类标签,Message Queue 是 Topic 的物理分区单元;一个 Topic 可划分为多个 Message Queue,分布在多个 Broker 上,实现消息的并行生产与消费。

[分层展开]

  • Topic:消息的逻辑主题,Producer 指定 Topic 发送消息,Consumer 订阅 Topic 消费消息;全局唯一标识。
  • Message Queue:每个 Topic 可配置多个读/写队列(readQueueNums / writeQueueNums);数量决定了 Consumer Group 内实例的并行度上限;同一 Topic 的队列分布在多个 Broker 上,实现负载均衡。
  • Message Queue 分配:同一 Consumer Group 内,一个 Message Queue 同一时刻只能被一个 Consumer 实例消费;Consumer 实例数超过 Queue 数量会导致部分实例空闲;扩容 Queue 数量需在 Broker 配置中修改并重启或通过命令行动态调整。
  • Queue 数据模型:每条消息包含 Topic、QueueId、QueueOffset 三元组实现唯一索引定位。
配置项默认值说明
readQueueNums8读队列数量(影响消费并行度)
writeQueueNums8写队列数量(影响生产并行度)
perm6读写权限(2=W, 4=R, 6=RW)
# 动态更新 Topic 队列数量
./mqadmin updateTopic -n namesrv:9876 -t order-topic -r 16 -w 16 -c DefaultCluster
3 NameServer 的路由发现机制是怎样工作的

答案:

NameServer 采用无状态、去中心化路由注册架构;Broker 通过心跳主动注册 Topic 路由信息,Producer 与 Consumer 通过定时轮询从 NameServer 拉取当前路由表。

[分层展开]

  • 路由注册:Broker 启动后向集群内所有 NameServer 发送注册请求(topic、brokerAddr、readQueueNums、writeQueueNums、perm);Broker 每 30 秒向各 NameServer 发送心跳维持注册状态。
  • 路由剔除:NameServer 120 秒未收到 Broker 心跳则将该 Broker 剔除;NameServer 之间互不通信,各节点独立维护路由视图。
  • 路由发现:Producer 与 Consumer 每隔 30 秒从 NameServer 拉取路由表并缓存在本地;发送/消费前根据本地路由表选择目标 Broker 和 Queue。
  • 故障感知:NameServer 剔除故障 Broker 后,Producer/Consumer 可在下次拉取时感知变更;拉取间隔为 30 秒,即故障发现延迟最多 30 秒。
# 查看 NameServer 集群状态
./mqadmin clusterList -n namesrv:9876

# 查看路由器由信息
./mqadmin topicRoute -n namesrv:9876 -t my-topic
4 Broker 主从复制的三种模式有什么区别

答案:

Broker 主从复制分为 SYNC_MASTER、ASYNC_MASTER、ASYNC_FLUSH 三种模式,核心差异在于消息写入 Slave 前是否等待确认以及消息刷盘时机。

[分层展开]

  • ASYNC_MASTER(异步复制):Master 写入消息后立即返回 Producer 成功;后台线程异步将数据复制到 Slave;性能最高但 Master 故障时可能丢消息。
  • SYNC_MASTER(同步复制):Master 写入消息后等待 Slave 确认接收后返回 Producer 成功;可靠性最高但增加写入延迟(约 1-3ms);若 Slave 不可用则写操作失败。
  • ASYNC_FLUSH(异步刷盘):消息写入 PageCache 后即返回成功;由操作系统或后台线程定期刷盘;性能高但机器掉电可能丢消息。
  • SYNC_FLUSH(同步刷盘):消息写入 PageCache 后等待 fsync 完成再返回成功;数据不丢失但写入延迟显著升高。
维度ASYNC_MASTERSYNC_MASTERASYNC_FLUSHSYNC_FLUSH
复制方式异步同步N/AN/A
数据可靠性
写入延迟低(微秒级)中(毫秒级)高(毫秒级)
掉电风险依赖复制无(Slave 已接收)
生产推荐高性能场景高可靠场景高性能场景金融级可靠性
# Broker 配置示例
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
5 RocketMQ 支持哪些消息类型

答案:

RocketMQ 支持普通消息、顺序消息、延迟消息、事务消息和批量消息五种消息类型,分别适用于不同业务场景。

[分层展开]

  • 普通消息(Normal Message):无特殊处理,Producer 发送后 Broker 存储即完成;Consumer 通过 Push/Pull 消费;消息无序保证。
  • 顺序消息(Ordered Message):支持全局顺序与分区顺序;全局顺序将所有消息发送到同一 Message Queue;分区顺序将相同业务键(如订单 ID)路由到同一个 Queue;FIFO 保证消费。
  • 延迟消息(Delayed Message):发送时不立即可见,到达指定延迟级别后投递;支持 18 个预定义延迟级别(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h);RocketMQ 5.0 支持任意时间精度延迟消息。
  • 事务消息(Transaction Message):两阶段提交协议;先发送半消息(Half Message),执行本地事务后提交或回滚;Broker 定期回查事务状态确保最终一致性。
  • 批量消息(Batch Message):单次发送多条消息,减少网络 RTT 开销;要求批次总大小不超过 4MB,所有消息具有相同 Topic。
// 顺序消息:相同订单 ID 路由到同一个 Queue
Message msg = new Message("order-topic", orderId, body.getBytes());
producer.send(msg, (mqs, msg1, arg) -> {
    long orderId = (long) arg;
    long index = orderId % mqs.size();
    return mqs.get((int) index);
}, orderId);

// 延迟消息:延迟 5 秒
msg.setDelayTimeLevel(2);
producer.send(msg);
6 Dledger 高可用机制如何替换传统主从复制

答案:

Dledger 是 RocketMQ 基于 Raft 协议实现的高可用存储方案,将 CommittedLog 日志复制通过 Raft 共识协议管理,替代传统的主从复制模式,实现自动故障转移与强一致性数据同步。

[分层展开]

  • 架构变更:Dledger 模式下所有节点对等(Peer),通过 Raft 选举产生 Leader;Leader 处理所有写入,Follower 仅同步日志。
  • CommitLog 替代:传统 CommitLog 被 Dledger CommitLog 替代,日志条目通过 Raft 写入并复制到多副本(3 副本或 5 副本)。
  • 故障转移:Leader 故障后,Raft 协议自动触发重新选举,5 秒内选出新 Leader;新 Leader 接管写入,无需人工介入。
  • 写入流程:Producer 发送消息到 Leader;Leader 写入本地日志后通过 AppendEntries RPC 复制到 Follower;多副本确认后返回成功。
  • 与 SYNC_MASTER 相比:Dledger 支持自动切换,SYNC_MASTER 需手动切换;Dledger 基于多数派确认(N/2+1),SYNC_MASTER 基于单一 Slave 确认。
维度传统主从(SYNC_MASTER)Dledger(Raft)
切换方式手动自动
副本数1 Master + N SlaveN 节点对等
选举时间分钟级(人工)秒级(自动)
数据一致性强一致(SYNC)Raft 多数派
运维复杂度中(需脚本切换)低(自动)
适用版本4.x4.5+
# Dledger 模式 Broker 配置
enableDLegerCommitLog=true
dLegerGroup=broker-a
dLegerPeers=n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913
dLegerSelfId=n0
sendMessageThreadPoolNums=16
7 RocketMQ Operator 的架构与 Kubernetes 部署流程是怎样的

答案:

RocketMQ Operator 基于 Kubernetes Operator 模式实现 RocketMQ 集群的自动化部署、扩缩容与生命周期管理,通过 CRD 定义 NameServer、Broker 等资源并以自定义调度逻辑编排 Pod 的创建与更新。

[分层展开]

  • Operator 架构:基于 controller-runtime 框架;通过 Custom Resource(CR)描述 RocketMQ 集群拓扑;Controller 监听 CR 变更并执行 Reconcile 循环将实际状态调整为目标状态。
  • 核心 CRDNameService 定义 NameServer 集群;Broker 定义 Broker 组(主从关系);TopicTransfer 定义 Topic 自动创建与负载均衡;Console 部署 RocketMQ 控制台。
  • Controller 职责:NameServer Controller 管理 NameServer Pod 的创建、更新、滚动升级;Broker Controller 管理 Broker StatefulSet 的扩缩容与镜像升级。
  • 部署流程:安装 Operator(Helm Chart 或 YAML);创建 NameServer CR 拉起 NameServer 集群;创建 Broker CR 拉起 Broker StatefulSet;Topic 经 CR 或 API 创建;Producer/Consumer 通过 K8s Service 访问。
# NameService CR 示例
apiVersion: rocketmq.apache.org/v1alpha1
kind: NameService
metadata:
  name: name-service
spec:
  size: 2
  image: apache/rocketmq:5.3.0
  hostNetwork: false
  resources:
    requests:
      memory: 512Mi
      cpu: 250m
    limits:
      memory: 1Gi
      cpu: 500m

# Broker CR 示例
apiVersion: rocketmq.apache.org/v1alpha1
kind: Broker
metadata:
  name: broker
spec:
  size: 2
  clusterName: DefaultCluster
  brokerImage: apache/rocketmq:5.3.0
  nameServers: name-service:9876
  replicationMode: DLedger
  storage:
    size: 100Gi
    storageClassName: ssd
8 RocketMQ 在 Kubernetes 上的 StatefulSet 配置有哪些关键要点

答案:

RocketMQ Broker 与 NameServer 以 StatefulSet 形式部署在 Kubernetes 上,关键配置包括稳定网络标识、持久化存储绑定、反亲和性调度与资源限制。

[分层展开]

  • 稳定网络标识:StatefulSet 为每个 Pod 分配固定网络标识(如 broker-0.broker-svc.default.svc.cluster.local);Dledger 模式依赖固定标识进行 Raft 通信。
  • PodManagementPolicy:使用 OrderedReady 策略确保 Pod 顺序启动,Dledger 场景下确保 n0 先启动以完成选举。
  • PVC 模板:通过 volumeClaimTemplates 为每个 Pod 自动创建 PVC;Pod 重调度后 PVC 自动重新挂载,数据不丢失。
  • 反亲和性(anti-affinity):通过 podAntiAffinity 将同一 Broker Group 的 Pod 分散到不同节点,避免单点故障。
  • Headless Service:配置 clusterIP: None 的 Headless Service,为 StatefulSet 提供 DNS 解析;Dledger 节点通过 DNS 发现彼此。
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: broker
spec:
  serviceName: broker-svc
  replicas: 3
  podManagementPolicy: OrderedReady
  selector:
    matchLabels:
      app: broker
  template:
    metadata:
      labels:
        app: broker
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchExpressions:
              - key: app
                operator: In
                values:
                - broker
            topologyKey: kubernetes.io/hostname
      containers:
      - name: broker
        image: apache/rocketmq:5.3.0
        command:
        - sh
        - mqbroker
        args:
        - -n
        - name-service:9876
        - -c
        - /home/rocketmq/conf/broker.conf
        ports:
        - containerPort: 10911
          name: remoting
        - containerPort: 40911
          name: dledger
        resources:
          requests:
            memory: 2Gi
            cpu: 1
          limits:
            memory: 4Gi
            cpu: 2
        volumeMounts:
        - name: data
          mountPath: /home/rocketmq/store
        - name: config
          mountPath: /home/rocketmq/conf
      volumes:
      - name: config
        configMap:
          name: broker-config
  volumeClaimTemplates:
  - metadata:
      name: data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: ssd
      resources:
        requests:
          storage: 200Gi
9 RocketMQ 5.0 的 Proxy 模式相比传统模式有什么变化

答案:

RocketMQ 5.0 引入 Proxy 组件,采用 gRPC 协议替代 Remoting 协议,对外暴露标准 gRPC 接口,让客户端从重量级 Java 客户端解耦为轻量级多语言 SDK,同时统一计算逻辑到服务端。

[分层展开]

  • 协议升级:原有 Remoting 协议(Java 序列化)替换为 gRPC(Protobuf);客户端支持多语言(Go、Rust、C++、Python、Node.js)。
  • Proxy 角色:Proxy 作为 Broker 前端代理,承接客户端的 gRPC 请求,解析后转发到 Broker;支持独立部署(独立 Pod)或嵌入 Broker 部署。
  • 轻量级客户端:客户端不再需加载完整 Broker 逻辑,仅需 gRPC Stub + 简单路由逻辑,SDK 体积显著减小。
  • 计算下沉:Consumer Rebalance、消息过滤、顺序消息排序等逻辑从客户端迁移到 Proxy,客户端逻辑大幅简化。
  • 多协议入口:Proxy 同时支持 Remoting 协议(兼容存量客户端)与 gRPC 协议(新客户端);实现协议层平滑迁移。
维度RocketMQ 4.x 传统模式RocketMQ 5.0 Proxy 模式
通信协议Remoting(基于 Netty)gRPC(基于 Protobuf)
客户端语言仅 Java多语言
Rebalance客户端执行Proxy 执行
消息过滤客户端执行Proxy / Broker 执行
部署方式NameServer + Broker+ Proxy(独立或内嵌)
# Proxy CR 示例
apiVersion: rocketmq.apache.org/v1alpha1
kind: Proxy
metadata:
  name: proxy
spec:
  size: 3
  proxyImage: apache/rocketmq-proxy:5.3.0
  nameServers: name-service:9876
  proxyMode: cluster  # 或 embedded
  proxyConfig:
    remotingProtocolEnable: true
    grpcServerPort: 8080
    maxInboundMessageSize: 134217728
10 Pop 消费模式的机制与优势是什么

答案:

Pop 消费模式是 RocketMQ 5.0 引入的轻量级消费模式,由 Proxy/Broker 端管理消费进度与负载均衡,客户端仅需发起 Pop 请求拉取消息并 Ack 确认,大幅降低客户端复杂度。

[分层展开]

  • 传统 Pull 模式问题:客户端需本地管理消费位点、参与 Rebalance、处理 Queue 锁定;逻辑重且多语言 SDK 实现成本高。
  • Pop 机制:Consumer 向 Broker 发送 Pop 请求指定 Topic、Consumer Group、批次大小;Broker 在服务端完成 Queue 选择、消息拉取和位点推进;返回消息后,Consumer 需在可见超时(Invisible Time)内 Ack;超时未 Ack 则消息自动重投。
  • Pop 轮询队列:服务端维护 Pop 轮询索引(Pop CheckPoint),记录每个 Consumer Group 对每个 Queue 的 Pop 位置;多个 Consumer 实例竞争 Pop 同一 Queue,由 Broker 协调避免重复。
  • IOT 机制:Invisible Time(不可见时间),消息 Pop 后在该时间段内对其他 Consumer 不可见;Consumer 处理成功后 Ack 删除消息,失败则返回对消费可见。
  • 优势:客户端无状态、无需本地管理位点、不需要 Rebalance 参与、多语言 SDK 实现成本低;吞吐量接近传统 Pull 模式。
// Pop 消费示例
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
    .setConsumerGroup("my-group")
    .setClientConfiguration(clientConfig)
    .setAwaitDuration(Duration.ofSeconds(30))
    .setSubscriptionExpressions(Collections.singletonMap("my-topic", FilterExpressionType.TAG, "*"))
    .build();

List<MessageView> messages = consumer.receive(16, Duration.ofSeconds(15));
for (MessageView mv : messages) {
    // 处理消息
    consumer.ack(mv);
}
11 RocketMQ 的消息存储机制(CommitLog / ConsumeQueue / IndexFile)如何协作

答案:

RocketMQ 采用三层存储模型,CommitLog 存储消息原始数据保证顺序写高性能,ConsumeQueue 存储消息逻辑偏移量索引加速消费,IndexFile 通过哈希索引实现消息按 Key/Tag 快速检索。

[分层展开]

  • CommitLog:所有 Topic 的消息顺序追加写入同一个 CommitLog 文件;单个文件默认 1GB;顺序写避免随机 IO,充分利用磁盘顺序写性能;消息存储格式包含:消息长度、MagicCode、消息体、属性。
  • ConsumeQueue:按 Topic-QueueId 为维度组织消费索引文件;每条索引固定 20 字节(CommitLogOffset + Size + Tag HashCode);Consumer 拉取消息时先读 ConsumeQueue 获取偏移量,再随机读 CommitLog。
  • IndexFile:基于哈希索引提供消息 Key 和 Tag 的快速检索;格式:IndexHead(40 字节)+ Slot Table(hash 槽)+ Index Linked List(索引项);查询时计算 Key 的哈希值,定位到 Slot Table,遍历链表找到偏移量。
  • MappedFile 与 MappedFileQueue:CommitLog、ConsumeQueue、IndexFile 均基于内存映射文件实现;MappedFile 将文件映射到虚拟内存,读写操作直接作用于内存并由 OS 异步刷盘;MappedFileQueue 管理文件轮转。
CommitLog (所有 Topic 消息顺序写)
  ├── ReputMessageService (异步构建消费索引)
  ├── ConsumeQueue (Topic-A/Queue-0)  [20B per entry]
  ├── ConsumeQueue (Topic-A/Queue-1)
  ├── ConsumeQueue (Topic-B/Queue-0)
  └── IndexFile (Hash 索引,按 Key/Tag 检索)
# Broker 存储目录结构
store/
├── commitlog/
│   └── 00000000000000000000
│   └── 00000000001073741824
├── consumequeue/
│   └── topic-a/
│       └── 0/
│           └── 00000000000000000000
│       └── 1/
│           └── 00000000000000000000
├── index/
│   └── 20260526120000000
├── config/
│   └── consumerOffset.json
│   └── delayOffset.json
└── abort
12 RocketMQ 的消息过滤支持哪些方式,各自适用什么场景

答案:

RocketMQ 支持 Tag 过滤、SQL92 表达式过滤与 ClassFilter(类过滤)三种消息过滤方式。Tag 过滤为 Producer 端标记式过滤,SQL92 表达式过滤在 Broker 端按属性匹配,ClassFilter 为 Consumer 端按类名过滤。

[分层展开]

  • Tag 过滤:消息发送时设置 Tag 字符串(最长 255 字节);Consumer 订阅时指定 Tag 或通配符(* 匹配所有,|| 多 Tag);过滤在 Broker 端执行,匹配 ConsumeQueue 中的 Tag HashCode。
  • SQL92 表达式过滤:基于消息属性(Properties)进行 SQL92 语法表达式匹配;enablePropertyFilter=true 设置为启用;过滤在 Broker 端执行,需开启 enableCalcFilterBitMap 计算布隆过滤器加速;支持语法:=、<>、>=、<=、AND、OR、IS NULL、IS NOT NULL
  • ClassFilter:通过 Java 类全限定名过滤;Producer 发送时设置消息类型,Consumer 反序列化时匹配类名;仅在 Java 客户端可用。
过滤方式执行位置过滤依据性能适用场景
TagBrokerTag String高(基于 ConsumeQueue)简单分类消息
SQL92Broker消息属性中(需计算表达式)多条件筛选
ClassFilterConsumerJava 类名低(需反序列化)对象消息
// Tag 过滤订阅
consumer.subscribe("order-topic", "paid||shipped");

// SQL92 表达式过滤
consumer.subscribe("order-topic", 
    MessageSelector.bySql("amount > 100 AND status = 'paid'"));

// Producer 设置属性以支持 SQL92 过滤
msg.putUserProperty("amount", "150");
msg.putUserProperty("status", "paid");
13 RocketMQ 事务消息的两阶段提交机制是怎样实现的

答案:

RocketMQ 事务消息实现两阶段提交:第一阶段发送 Half Message(对 Consumer 不可见),第二阶段根据本地事务执行结果决定 Commit 或 Rollback;若第二阶段超时未收到确认,Broker 通过事务回查机制向 Producer 确认最终状态。

[分层展开]

  • Half Message:Producer 发送消息时标记为事务消息;Broker 存储消息但标记为 Half 状态;Consumer 无法消费 Half 消息。
  • 本地事务执行:Producer 收到 Broker 的 Half Message 写入成功响应后执行本地事务(如扣款、更新数据库)。
  • Commit / Rollback:本地事务执行成功后发送 Commit 请求,Broker 将消息标记为可消费状态;执行失败发送 Rollback,Broker 删除 Half Message。
  • 事务回查(Check):若 Broker 在超时时间内未收到 Commit/Rollback 请求,主动回调 Producer 的事务回查接口;Producer 查询本地事务状态后返回 Commit 或 Rollback;回查最多执行 15 次。
Producer                    Broker                   Consumer
   |                          |                         |
   |-- Half Message -------->|                         |
   |<-- OK ------------------|                         |
   |                          |                         |
   |-- 执行本地事务           |                         |
   |   (update DB)           |                         |
   |                          |                         |
   |-- Commit / Rollback --->|                         |
   |<-- OK ------------------|                         |
   |                          |                         |
   |                          |-- (消息可消费) --------->|
   |                          |                         |
   |     (超时未确认)         |                         |
   |<-- Check 回查 ----------|                         |
   |-- Commit / Rollback --->|                         |
// 事务消息发送
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务
            dbService.updateOrder(orderId);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 回查本地事务状态
        Order order = dbService.getOrder(msg.getKeys());
        return order.isPaid() ? LocalTransactionState.COMMIT_MESSAGE 
                              : LocalTransactionState.ROLLBACK_MESSAGE;
    }
});
producer.sendMessageInTransaction(msg, orderId);
14 RocketMQ 延迟消息的实现机制是什么

答案:

RocketMQ 延迟消息基于预定义延迟级别与内部 SYSTEM Topic 实现,Broker 将延迟消息写入延迟专属 ConsumeQueue(SCHEDULE_TOPIC_XXXX),按照延迟级别在对应时间点将消息还原至原始 Topic 供 Consumer 消费。

[分层展开]

  • 18 个延迟级别:Level 1 = 1s, 2 = 5s, 3 = 10s, 4 = 30s, 5 = 1m, 6 = 2m, 7 = 3m, 8 = 4m, 9 = 5m, 10 = 6m, 11 = 7m, 12 = 8m, 13 = 9m, 14 = 10m, 15 = 20m, 16 = 30m, 17 = 1h, 18 = 2h。
  • 实现原理:Producer 设置消息 delayTimeLevel 属性;Broker 收到延迟消息后,将原始 Topic 与 QueueId 替换为 SCHEDULE_TOPIC_XXXX 和对应延迟级别的 QueueId;CommitLog 存储延迟消息;ScheduleMessageService 定时轮询每个延迟级别的 ConsumeQueue 偏移量;到达延迟时间后读取 CommitLog 中的原始消息还原到原始 Topic。
  • RocketMQ 5.0 改进:支持任意精度延迟时间(基于 TimerWheel 时间轮);可通过 setDeliverTimeMs 设置精确时间戳延迟;时间轮粒度可配置。
# 自定义延迟级别(messageDelayLevel)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
// 延迟消息发送
Message msg = new Message("order-topic", body.getBytes());
msg.setDelayTimeLevel(3);  // 延迟 10 秒

// RocketMQ 5.0 精确延迟
msg.setDeliverTimeMs(System.currentTimeMillis() + 60_000);  // 1 分钟后
15 Consumer Rebalance 机制如何保证消息不丢失、不重复

答案:

Consumer Rebalance 在 Consumer Group 内实例发生增删或 Topic Queue 数量变化时,重新分配 Queue 到 Consumer 实例的映射关系;通过广播心跳与锁机制实现 Queue 分配的一致性与平滑过渡。

[分层展开]

  • 触发条件:Consumer 实例上线/下线、Topic 新建/删除、Queue 数量变更、Broker 上线/下线。
  • 分配算法(AllocateMessageQueueStrategy):
    • AllocateMachineRoomNearby:基于机房就近分配;
    • AllocateMessageQueueAveragely:平均分配(默认);
    • AllocateMessageQueueAveragelyByCircle:轮询平均分配;
    • AllocateMessageQueueByConfig:固定配置分配;
    • AllocateMessageQueueConsistentHash:一致性哈希分配。
  • Rebalance 流程:Consumer 定期(20 秒)从 Broker 获取当前 Consumer Group 所有实例 ID;根据分配策略计算自己所属的 Queue 列表;获取新分配 Queue 的锁;释放不再属于自己的 Queue 的锁;更新本地 Queue 分配表。
  • Queue 锁机制:Consumer 向 Broker 请求对特定 Queue 的分布式锁(60 秒 TTL);持有锁期间该 Queue 仅被该 Consumer 消费;Rebalance 期间旧 Consumer 仍持有锁直至 Rebalance 完成,避免重复消费。
  • 消费位点持久化:每个 Queue 的消费位点异步持久化到 Broker;Consumer 重启后从 Broker 加载位点续消费,避免丢失。
# 查看 Consumer Group 的 Queue 分配
./mqadmin consumerStatus -n namesrv:9876 -g my-consumer-group

# 查看消费位点
./mqadmin consumerProgress -n namesrv:9876 -g my-consumer-group
16 顺序消息的实现原理与注意事项是什么

答案:

顺序消息通过 MessageQueueSelector 将具有相同业务键的消息路由到同一 Message Queue,利用 Queue 内 FIFO 特性保证消费顺序;需配合锁机制与单线程顺序消费实现严格有序保证。

[分层展开]

  • 分区顺序实现:Producer 端通过自定义 MessageQueueSelector 将相同业务键(如订单 ID)的消息选择同一 Queue;同一 Queue 内消息严格按写入顺序存储;Consumer 端对同一 Queue 采用单线程顺序拉取。
  • 全局顺序实现:Topic 仅配置一个 WriteQueue;所有消息写入同一 Queue;Consumer 单实例消费;性能低,仅适用于全局严格顺序场景。
  • Consumer 端锁机制:顺序消费时 Consumer 向 Broker 申请 Queue 锁,确保同一时刻只有一个 Consumer 实例消费该 Queue;防止 Rebalance 期间多实例同时消费导致乱序。
  • 服务端保序:RocketMQ 5.0 Proxy 模式下,顺序消息消费由 Proxy 统一调度,Proxy 确保同一 Queue 的消息以单线程顺序 Push 给 Consumer。
  • 注意事项:顺序消息牺牲了并行度(单 Queue 单线程消费);发送端需用同步方式确保发送成功,失败重试需保持发送顺序;Consumer 端异常重试会阻塞后续消息消费(可配置 suspendCurrentQueueTimeMillis)。
// 分区顺序发送
producer.send(msg, (mqs, msg1, arg) -> {
    long orderId = (long) arg;
    int index = (int) (orderId % mqs.size());
    return mqs.get(index);
}, orderId);

// 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
                                                ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            processOrder(msg);
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
17 消息轨迹(Message Trace)的机制是什么

答案:

消息轨迹记录消息从 Producer 发送、Broker 存储到 Consumer 消费的完整链路信息,包括各节点的时间戳、IP、状态等;开启 Trace 后 Broker 将轨迹数据异步发送至内部 Trace Topic 持久化。

[分层展开]

  • Trace 数据结构TraceContext 封装一条消息的完整生命周期;TraceBean 记录每个节点的信息(MsgId、Offset、StoreHost、CostTime 等);分为 Produce 阶段、Store 阶段、Consume 阶段。
  • Trace Topic:默认 RMQ_SYS_TRACE_TOPIC;轨迹数据作为普通消息异步发送到该系统 Topic;不阻塞主流程,性能开销极低。
  • 启用方式
    • Producer 端:enableMsgTrace = true
    • Consumer 端:enableMsgTrace = true
    • 可通过 AsyncTraceDispatcher 自定义 Trace 发送参数和线程池。
  • Trace 数据消费:通过 RocketMQ Console 或自定义 Consumer 消费 Trace Topic 数据进行轨迹查询与可视化。
  • 性能影响:异步发送 Trace,对主流程延迟影响 < 1ms;Trace Topic 额外消耗 Broker 存储和网络带宽。
// Producer 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("trace-producer-group");
producer.setNamesrvAddr("namesrv:9876");
producer.setEnableMsgTrace(true);

// 自定义 Trace 配置
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(
    "producer-group",
    TraceConstants.TraceTopic.RMQ_SYS_TRACE_TOPIC,
    namesrvAddr
);
dispatcher.setTraceQueueSize(1024);
dispatcher.setMaxMsgSize(128000);
producer.setTraceDispatcher(dispatcher);
18 RocketMQ 的 ACL 权限控制如何实现

答案:

RocketMQ ACL 通过 plain_acl.yml 配置文件定义 AccessKey / SecretKey 与资源权限映射,在 Broker 端对所有请求进行鉴权与授权检查,支持用户管理、Topic/Group 级别读写控制与 IP 白名单。

[分层展开]

  • ACL 开关:Broker 配置 aclEnable=true 全局启用 ACL。
  • 认证方式:HMAC-SHA1 签名认证;Client 使用 SecretKey 对请求签名,Broker 端校验签名真实性。
  • 权限模型AccessKey 唯一标识用户;SecretKey 用于签名;权限粒度:Topic + ConsumerGroup 级别的 PUB(发布)与 SUB(订阅);支持 IP 白名单限制。
  • 全局白名单globalWhiteRemoteAddresses 配置免密 IP 列表,管理 IP 无需 ACL 校验。
  • 授权流程:Client 请求携带 Auth Token(AccessKey + 时间戳 + 签名);Broker RpcHook 拦截请求,提取 Token;校验签名是否与配置的 SecretKey 匹配;检查请求的 Topic/Group 是否在用户权限列表中。
# plain_acl.yml
globalWhiteRemoteAddresses:
  - 10.0.0.*
  - 192.168.0.*

accounts:
  - accessKey: producer-ak
    secretKey: producer-sk-xxx
    whiteRemoteAddress:
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: DENY
    topicPerms:
      - order-topic=PUB
    groupPerms:

  - accessKey: consumer-ak
    secretKey: consumer-sk-yyy
    whiteRemoteAddress:
    admin: false
    defaultTopicPerm: DENY
    defaultGroupPerm: DENY
    topicPerms:
      - order-topic=SUB
    groupPerms:
      - order-consumer=SUB

  - accessKey: admin-ak
    secretKey: admin-sk-zzz
    admin: true
# Broker 配置
aclEnable=true
19 RocketMQ 的监控指标体系与 Prometheus + Grafana 集成方式

答案:

RocketMQ 通过 Prometheus Exporter 暴露 Metrics 端点,将各组件指标(Broker TPS、存储延迟、消费堆积、线程池)转换为 Prometheus 格式,经 Grafana Dashboard 可视化展示。

[分层展开]

  • Exporter 集成:RocketMQ 5.0 内置 Prometheus Exporter(rocketmq-exporter 独立部署);Broker 启动时以 --enable-metrics 参数开启;Exporter 从 Broker 拉取运行时指标并暴露 /metrics 端点。
  • 核心指标分类
    • Broker 吞吐rocketmq_broker_tps(秒级消息 TPS)、rocketmq_broker_qps(Put TPS)、rocketmq_broker_get_tps(Get TPS)。
    • 存储指标rocketmq_store_dispatch_behind_bytes(分发落后字节数)、rocketmq_store_message_store_time(消息存储耗时)。
    • 消费堆积rocketmq_consumer_offset(消费位点)、rocketmq_broker_offset(Broker 最大位点);差值即为堆积量。
    • 线程池rocketmq_thread_pool_executor_queue_size(线程池队列长度)。
  • Grafana Dashboard:官方提供 RocketMQ Dashboard JSON 模板;涵盖 Broker Overview(TPS/QPS/RT)、Topic Detail(生产消费 TPS、堆积)、Consumer Detail(消费延迟、Rebalance 次数)、存储(CommitLog Diff、盘使用率)。
  • Prometheus ServiceMonitor(Kubernetes 场景):创建 PodMonitorServiceMonitor 自动发现 RocketMQ Pod 的 Metrics 端点。
# rocketmq-exporter 部署示例
apiVersion: apps/v1
kind: Deployment
metadata:
  name: rocketmq-exporter
spec:
  replicas: 1
  selector:
    matchLabels:
      app: rocketmq-exporter
  template:
    metadata:
      labels:
        app: rocketmq-exporter
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "5557"
        prometheus.io/path: "/metrics"
    spec:
      containers:
      - name: exporter
        image: apache/rocketmq-exporter:1.0.0
        ports:
        - containerPort: 5557
        command:
        - java
        - -jar
        - rocketmq-exporter.jar
        - --rocketmq.namesrv.addr=name-service:9876
20 RocketMQ 的数据备份与恢复方案有哪些

答案:

RocketMQ 数据备份与恢复方案包括 Broker 主从复制(热备)、CommitLog 文件备份(温备)、全量存储快照(冷备)以及 Kubernetes 环境下基于 VolumeSnapshot 的存储级快照备份。

[分层展开]

  • 热备(主从复制):Master-Slave 异步/同步复制实现实时数据冗余;Slave 提供读服务与故障接管能力;RPO 取决于复制模式(ASYNC 有秒级延迟,SYNC 为零)。
  • 温备(CommitLog 文件备份):按日期/时间备份 CommitLog 文件到对象存储(S3/OSS/MinIO);脚本周期(如 crontab 每小时)拷贝已关闭的 CommitLog 文件;恢复时按顺序回放 CommitLog 重建 ConsumeQueue 和 IndexFile。
  • 冷备(全量快照):通过 mqadmin 导出消费位点和配置;备份整个 store/ 目录;适用于重大变更前或定期冷备。
  • Kubernetes VolumeSnapshot:利用 CSI 快照功能创建 PVC 快照;基于时间调度(CronJob 触发)自动创建 VolumeSnapshot;恢复时从快照创建新 PVC 并挂载到新 Pod。
# CommitLog 文件备份脚本
#!/bin/bash
BACKUP_DIR="/backup/rocketmq/$(date +%Y%m%d)"
mkdir -p ${BACKUP_DIR}
# 仅备份不再写入的 CommitLog 文件
find /store/commitlog -name "0000000*" ! -newer /store/abort -exec cp {} ${BACKUP_DIR} \;

# 导出消费位点
./mqadmin exportConfigs -n namesrv:9876 -c DefaultCluster > /backup/configs.json
# VolumeSnapshot 示例
apiVersion: snapshot.storage.k8s.io/v1
kind: VolumeSnapshot
metadata:
  name: broker-data-snapshot-20260526
spec:
  volumeSnapshotClassName: csi-snapclass
  source:
    persistentVolumeClaimName: data-broker-0
21 RocketMQ 5.0 的主从切换(Controller)机制如何实现自动切换

答案:

RocketMQ 5.0 引入 Controller 组件实现 Broker 主从自动切换,Controller 基于 Raft 协议选举 Leader 并管理 Broker Group 的主从角色,检测 Master 故障后自动提升 Slave 为新 Master,切换过程对客户端透明。

[分层展开]

  • Controller 架构:Controller 以独立进程部署(3 节点 Raft 集群);监控 Broker Group 内各节点的健康状态;维护主从映射关系;NameServer 通过 Controller 获取最新 Master 地址。
  • Broker 注册:Broker 启动后向 Controller 注册(BrokerName、BrokerId、Address);Controller 从 Raft 状态机中持久化 Broker 元数据;Broker 定期发送心跳。
  • 故障检测与切换:Controller 检测 Master 心跳超时(默认 10 秒);从 Slave 节点中选择新 Master(优先级:SYNC_MASTER Slave > ASYNC Master Slave);更新 Raft 状态机中的主从映射;通知其他 Broker 和 NameServer 路由变更。
  • 切换对客户端透明:Producer 发现原 Master 发送失败后从 NameServer 获取新路由;Consumer 自动 Rebalance 到新 Master 所在节点;消息不丢失前提是使用 SYNC_MASTER 或 Dledger。
# Controller CRD 部署示例
apiVersion: rocketmq.apache.org/v1alpha1
kind: Controller
metadata:
  name: controller
spec:
  size: 3
  image: apache/rocketmq:5.3.0
  controllerConfig:
    enableControllerMode: true
    scanNotActiveBrokerInterval: 5000
# Broker 连接 Controller 配置
enableControllerMode=true
controllerAddr=controller-0.controller-svc:9878;controller-1.controller-svc:9878;controller-2.controller-svc:9878
22 消息堆积的排查思路与处理方案有哪些

答案:

消息堆积指 Consumer 消费速度落后于 Producer 生产速度,在 Queue 中积压大量未消费消息;排查方向包括 Consumer 消费能力不足、Consumer 实例数不够、消费逻辑异常阻塞,处理方案包括扩容 Consumer 实例、优化消费逻辑与临时提升消费并行度。

[分层展开]

  • 堆积监控:通过 rocketmq_consumer_offsetrocketmq_broker_offset 差值计算堆积量;Prometheus 告警规则设置堆积阈值;RocketMQ Console 直观查看 Group 消费 TPS 与堆积趋势。
  • 排查流程
    1. 确认堆积的 Topic 与 Consumer Group;
    2. 检查 Consumer 实例数是否小于 Queue 数量(consumerStatus);
    3. 检查单个 Consumer 的消费 RT 是否过长(消费逻辑阻塞/慢查询/锁等待);
    4. 检查 Consumer 线程池队列是否满(threadPoolQueueSize 指标);
    5. 检查 Broker 的 Get TPS 是否已达上限。
  • 紧急处理
    • 横向扩容 Consumer 实例(不超过 Queue 数量);
    • 临时提升 Consumer 线程池大小(consumeThreadMin / consumeThreadMax);
    • 创建临时 Consumer Group + 新 Topic,转移部分消息(mqadmin 复制消费位点);
    • 搭建临时 Consumer 集群批量转移消息;
    • Queue 数量过少则动态增加 Queue(需评估影响)。
  • 长期治理
    • 合理规划 Queue 数量(通常 >= 预期最大 TPS / 单 Queue TPS 上限);
    • Consumer 弹性伸缩(HPA 基于堆积量);
    • 消费逻辑异步化、批量化、预取优化。
# 查看 Consumer Group 堆积状态
./mqadmin consumerProgress -n namesrv:9876 -g my-group

# 动态调整 Queue 数量
./mqadmin updateTopic -n namesrv:9876 -t order-topic -r 32 -w 32 -c DefaultCluster
# Consumer HPA 配置(Kubernetes)
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: consumer-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: consumer-deploy
  minReplicas: 2
  maxReplicas: 16
  metrics:
  - type: External
    external:
      metric:
        name: rocketmq_consumer_lag
        selector:
          matchLabels:
            group: my-group
      target:
        type: AverageValue
        averageValue: 1000
23 RocketMQ 性能调优的关键参数有哪些

答案:

RocketMQ 性能调优围绕 JVM 内存、操作系统参数、刷盘策略、网络配置与线程池五个维度展开,核心调整项包括堆内存与堆外内存分配、刷盘模式与 CommitLog 异步构造 ConsumeQueue 的并行度。

[分层展开]

  • JVM 内存

    • 堆内存:Broker 默认 8G,大 Broker 建议 16-32G;rocketmqHome/bin/runbroker.shJAVA_OPT-Xms / -Xmx
    • 堆外内存:CommitLog 使用 MappedFile 内存映射,系统需预留至少 CommitLog 文件大小 + 1 个文件的堆外内存;
    • GC 选择:G1 GC(-XX:+UseG1GC)适用于大堆,延迟可控。
  • 操作系统参数

    • vm.swappiness=1:减少 Swap 交换,避免 MappedFile 被换出;
    • vm.dirty_ratio=40vm.dirty_background_ratio=10:控制 Page Cache 脏页比例;
    • 文件描述符上限:ulimit -n 65535
  • 刷盘策略

    • flushDiskType=ASYNC_FLUSH:异步刷盘,高吞吐场景推荐;
    • flushCommitLogTimed=falseflushIntervalCommitLog=500:控制刷盘间隔。
  • 线程池配置

    • sendMessageThreadPoolNums:发送消息线程池核心数(CPU 密集 = CPU 核数);
    • pullMessageThreadPoolNums:拉消息线程池核心数(IO 密集 = CPU 核数 x 2);
    • useReentrantLockWhenPutMessage=true:减少锁竞争(禁用自旋锁)。
  • 网络配置

    • serverSocketRcvBufSize=65536:TCP 接收缓冲区;
    • osPageCacheBusyTimeOutMills=1000:Page Cache 忙时等待超时(提高写入成功率)。
# 性能优化 Broker 配置示例
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=32
useReentrantLockWhenPutMessage=true
flushDiskType=ASYNC_FLUSH
flushCommitLogTimed=false
flushIntervalCommitLog=500
transientStorePoolEnable=true
transientStorePoolSize=5
osPageCacheBusyTimeOutMills=1000
# JVM 参数优化
JAVA_OPT="${JAVA_OPT} -server -Xms16g -Xmx16g -Xmn6g -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=30"
24 RocketMQ Connector 跨集群复制如何实现

答案:

RocketMQ Connector(即 RocketMQ Connect)是基于 Connect 框架的跨集群数据同步组件,通过 Source Connector 从源集群消费消息并写入目标集群的 Sink Connector,实现异构集群间的消息同步与灾备。

[分层展开]

  • 架构设计:RocketMQ Connect 独立部署,包含 Worker(任务执行器)与 Runtime(运行环境);SourceTask 从源集群拉消息;SinkTask 将消息写入目标集群;Offset 管理确保断点续传。
  • Connector 类型
    • RocketMQSourceConnector / RocketMQSourceTask:从 RocketMQ 集群消费;
    • RocketMQSinkConnector / RocketMQSinkTask:写入 RocketMQ 集群;
    • 支持异构数据源:RocketMQ to Kafka、MySQL to RocketMQ 等。
  • 复制模式
    • 主-备复制:Active/Standby 模式,源集群全部 Topic 异步复制到目标集群;
    • 单向灾备:生产消息到本地集群,异步复制到远程灾备集群;
    • 双向同步:两个集群互为备份,需处理消息循环回写问题。
  • 位点同步:SourceTask 定期保存消费位点到目标集群的 Offset Store Topic;重启后从上次保存的位点恢复同步。
// RocketMQ Source Connector 配置
{
  "connector-class": "org.apache.rocketmq.connect.rocketmq.RocketMQSourceConnector",
  "rocketmq.source.namesrv.addr": "source-namesrv:9876",
  "rocketmq.source.cluster": "cluster-a",
  "rocketmq.source.topic": "order-topic",
  "rocketmq.source.group": "connector-source-group",
  "rocketmq.target.namesrv.addr": "target-namesrv:9876",
  "rocketmq.target.topic": "order-topic",
  "tasks.max": "4"
}
# RocketMQ Connect StatefulSet(Kubernetes)
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: rocketmq-connect
spec:
  serviceName: connect-svc
  replicas: 2
  template:
    spec:
      containers:
      - name: connect
        image: apache/rocketmq-connect:1.0.0
        env:
        - name: ROCKETMQ_HOME
          value: /home/rocketmq
        - name: CONNECT_WORKER_NUM
          value: "8"
25 RocketMQ 5.0 的 Kafka Bridge 如何实现 Kafka 协议兼容

答案:

RocketMQ 5.0 Kafka Bridge 在 Proxy 层实现 Kafka 协议解析,将 Kafka 原生 TCP 协议请求转换为 RocketMQ 内部 Remoting/gRPC 请求,实现现有 Kafka Producer/Consumer 代码无需修改即可连接 RocketMQ 集群。

[分层展开]

  • 协议转换层:Proxy 在 9092 端口监听 Kafka 原生协议(SASL、Metadata、Produce、Fetch、OffsetCommit、OffsetFetch 等请求);解析 Kafka 协议二进制帧,提取 Topic、Partition、Consumer Group 等信息;将 Partition 映射为 RocketMQ Message Queue,将 Consumer Group Offset 映射为消费位点。
  • Kafka 到 RocketMQ 映射
    • Kafka Topic -> RocketMQ Topic;
    • Kafka Partition -> RocketMQ Message Queue;
    • Kafka Consumer Group -> RocketMQ Consumer Group;
    • Kafka Offset -> RocketMQ Queue Offset;
    • Kafka Key -> RocketMQ Message Key;
    • Kafka Headers -> RocketMQ Properties。
  • 部署方式:Kafka Bridge 嵌入 RocketMQ Proxy 或以独立模式部署;启用 proxyMode=kafka 或通过配置开启 Kafka 协议监听。
  • 兼容范围:基本兼容 Kafka 0.10+ API(Produce、Fetch、OffsetCommit、OffsetFetch、Metadata、GroupCoordinator 请求);支持 SASL/PLAIN 认证;支持 Kafka Consumer 的自动 Rebalance;事务与幂等写入不支持。
# Proxy 开启 Kafka Bridge 配置
apiVersion: rocketmq.apache.org/v1alpha1
kind: Proxy
metadata:
  name: proxy
spec:
  proxyConfig:
    kafkaProtocolEnable: true
    kafkaListenPort: 9092
    remotingProtocolEnable: true
    grpcProtocolEnable: true
# Kafka 客户端代码无需修改
bootstrap.servers=proxy-service:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
26 RocketMQ 的存储清理与过期策略如何配置

答案:

RocketMQ 通过时间过期(fileReservedTime)与磁盘空间水位(diskMaxUsedSpaceRatio)两种策略触发 CommitLog 文件清理,配合 ConsumeQueue 清理级联删除实现磁盘空间回收。

[分层展开]

  • CommitLog 文件清理
    • 时间过期fileReservedTime(默认 72 小时),CommitLog 文件最后修改时间超过该阈值后标记可删除;
    • 磁盘水位diskMaxUsedSpaceRatio(默认 75%),磁盘使用率超过该比例后强制清理最旧的 CommitLog 文件。
  • 清理流程:Broker 后台线程(CleanCommitLogService / CleanConsumeQueueService)定期检查;清理前检查 CommitLog 文件是否包含未被消费的消息(最低消费位点之前);在凌晨(deleteWhen=04)执行文件物理删除。
  • ConsumeQueue 清理:CommitLog 文件删除后,对应 ConsumeQueue 中的索引条目跟着删除;ConsumeQueue 文件的物理删除稍微滞后;destroyMapedFileIntervalForcibly=120000 控制强制清理未释放映射的 MappedFile 间隔。
  • 清理保护cleanFileForciblyEnable=true 允许强制删除过时文件;deleteCommitLogFilesInterval=100(毫秒)控制删除间隔避免 IO 冲击。
# 存储清理配置
fileReservedTime=72
deleteWhen=04
diskMaxUsedSpaceRatio=75
deleteCommitLogFilesInterval=100
destroyMapedFileIntervalForcibly=120000
cleanFileForciblyEnable=true
deleteConsumeQueueFilesInterval=100
# 手动触发清理
./mqadmin cleanExpiredCQ -n namesrv:9876 -c DefaultCluster
27 RocketMQ 与 Kafka 在设计理念与适用场景上有何差异

答案:

RocketMQ 与 Kafka 均为分布式消息队列,核心差异体现在存储模型(RocketMQ 所有 Topic 共用一个 CommitLog 顺序写,Kafka 每个 Partition 独立 Segment 文件顺序写)、消费模型(RocketMQ 支持 Pull + Pop + Push,Kafka 仅 Pull)、事务消息支持以及延迟消息原生能力。

[分层展开]

维度RocketMQKafka
存储模型单 CommitLog + 按 Topic 索引(ConsumeQueue)每 Partition 独立 Segment 文件
消费模型Pull / Pop / Push(服务端 Push)Pull(客户端长轮询)
消费模式Clustering / BroadcastingConsumer Group(分区绑定)
延迟消息原生 18 级别(5.0 精确时间)无原生支持(需外部实现)
事务消息原生两阶段 + 回查幂等 + 事务 Coordinator(KIP-98)
消息过滤Tag / SQL92 / ClassFilter 服务端过滤客户端拉取后过滤
顺序消息分区顺序(Queue 内严格有序)分区顺序(Partition 内严格有序)
高可用Master-Slave / Dledger(Raft) / ControllerISR(In-Sync Replicas)
跨集群复制RocketMQ Connector / 5.0 原生MirrorMaker 2.0
多协议Remoting(4.x)/ gRPC(5.0)/ Kafka BridgeKafka Protocol
MQTT 支持RocketMQ 5.0 MQTT Bridge需 Kafka Connect 扩展
Topic 数量受存储模型限制(建议 < 5000)单 Partition 独立文件(可 < 10w)
适用场景金融交易、电商订单、延迟任务、事务消息日志采集、流处理、大数据管道

选择建议

  • RocketMQ 优先场景:需要延迟/事务消息、Topic 数量较少且消息量密集、金融级事务一致性要求、需要多语言 gRPC 客户端。
  • Kafka 优先场景:日志/事件采集、大数据流处理(与 Spark/Flink 集成)、海量 Topic 分区、高吞吐离线数据分析。
28 RocketMQ 与 RabbitMQ 在设计理念与适用场景上有何差异

答案:

RocketMQ 面向分布式大规模消息处理,采用 Topic-Queue 分区模型与日志存储架构;RabbitMQ 面向企业消息路由,基于 AMQP 0-9-1 协议与 Exchange-Binding-Queue 路由模型。RocketMQ 吞吐量显著高于 RabbitMQ,RabbitMQ 路由灵活性显著高于 RocketMQ。

[分层展开]

维度RocketMQRabbitMQ
协议Remoting / gRPC(自有)AMQP 0-9-1(标准)
路由模型Topic -> MessageQueueExchange -> Binding -> Queue
Exchange 类型无(Topic 直接对应 Queue)Direct / Fanout / Topic / Headers
消息存储磁盘持久化(顺序写)内存 + 磁盘(MQTT / Lazy Queue)
消息确认Pull 模式 Ack;Pop 模式 AckConsumer Ack(Auto / Manual)
事务消息原生两阶段无(需分布式事务框架补偿)
延迟消息原生支持通过死信队列 + TTL 模拟
吞吐量十万级 TPS万级 TPS
集群扩展水平扩展(增加 Broker + Queue)垂直扩展 + Quorum Queue
高可用Master-Slave / Dledger / ControllerMirror Queue / Quorum Queue
管理界面Console(轻量)Management Plugin(功能丰富)
客户端语言Java / Go / C++ / Rust / Python / Node.js多语言(AMQP 标准客户端)
适用场景大规模分布式异步通信、流计算企业消息路由、任务分发、微服务 RPC

选择建议

  • RocketMQ 优先场景:高吞吐大规模消息通信、需要延迟/事务/顺序消息、大规模微服务异步解耦。
  • RabbitMQ 优先场景:灵活路由逻辑复杂、Exchange 多级路由、流量较小的后台任务分发、需要成熟管理界面的企业环境。
29 RocketMQ 常见故障的排查思路是什么

答案:

RocketMQ 常见故障包括消息发送失败、消费停滞、消息丢失、Broker 内存溢出、NameServer 连接异常;排查应遵循"客户端日志 -> Broker 日志 -> 网络连通性 -> 配置一致性"的递进顺序。

[分层展开]

  • 消息发送失败(SEND_ERROR / TIMEOUT

    1. 检查 Producer 是否连接 NameServer(namesrvAddr 配置);
    2. 检查 Topic 是否已在 Broker 上创建或允许自动创建(autoCreateTopicEnable=true);
    3. 检查 Broker Master 是否存活(mqadmin clusterList);
    4. 检查 SYNC_MASTER 模式下 Slave 是否可用(写失败可能因 Slave 不健康);
    5. 检查 Broker 消息容量是否满(diskMaxUsedSpaceRatio 水位导致拒绝写入)。
  • 消费停滞(堆积增多)

    1. 检查 Consumer 实例数是否小于 Queue 数量;
    2. 检查消费逻辑是否有异常阻塞(慢查询、死锁、线程池满);
    3. 检查 Consumer 是否发生频繁 Rebalance(网络抖动、Consumer 心跳丢失);
    4. 检查 ConsumeQueue 构建是否滞后(dispatchBehindBytes 指标)。
  • 消息丢失场景

    • ASYNC_MASTER + 异步刷盘 + Master 宕机 => 配置为 SYNC_MASTER + Dledger;
    • Consumer 消费后未提交 Ack + 实例异常退出 => 确保消费逻辑完成后 Ack;
    • NameServer 全部不可用 + 客户端路由缓存过期 => 部署 3 节点 NameServer。
  • Broker OOM / CPU 飙高

    1. CommitLog MappedFile 过多,堆外内存不足 => 减少 mapedFileSizeCommitLog 或增加节点内存;
    2. ConsumeQueue 文件过多(Topic 数量大)=> 减少 Topic 数量或优化 Topic 设计;
    3. sendMessageThreadPoolNums 过高导致 CPU 上下文切换 => 调整为核心数。
# 查看 Broker 状态
./mqadmin clusterList -n namesrv:9876

# 查看 Topic 路由信息
./mqadmin topicRoute -n namesrv:9876 -t order-topic

# 检查消费位点
./mqadmin consumerProgress -n namesrv:9876 -g my-group

# 查看 Broker 日志
tail -f /home/rocketmq/logs/rocketmqlogs/broker.log | grep ERROR
30 RocketMQ on Kubernetes 生产环境有哪些最佳实践

答案:

RocketMQ on Kubernetes 生产环境最佳实践涵盖集群拓扑规划、资源配置、持久化存储、高可用架构、监控告警、安全加固与运维自动化七个方面。

[分层展开]

集群拓扑规划

组件推荐数量部署形式说明
NameServer3Deployment(无状态)独立于 Broker 部署,避免共享故障域
Broker Master2+(每个 Group)StatefulSet(有状态)Dledger 模式每个 Group 3 副本
Proxy3Deployment启用 gRPC 协议,承载客户端连接
Controller3StatefulSetRocketMQ 5.0 自动主从切换

持久化存储

# 使用高性能 SSD StorageClass
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: rocketmq-ssd
provisioner: pd.csi.storage.gke.io  # 以 GCP 为例
parameters:
  type: pd-ssd
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true

资源配置

resources:
  requests:
    memory: 4Gi
    cpu: 2
  limits:
    memory: 8Gi
    cpu: 4

高可用架构

  • 反亲和性:同一 Broker Group 的 Pod 分散在不同可用区/节点。
  • PodDisruptionBudget:设置 maxUnavailable: 1 阻止维护期间 Broker 全部下线。
  • 健康检查:Liveness Probe 检查进程存活、Readiness Probe 检查 Broker 是否可注册。
livenessProbe:
  exec:
    command:
    - sh
    - -c
    - /home/rocketmq/bin/mqadmin clusterList -n localhost:9876 | grep BROKER
  initialDelaySeconds: 30
  periodSeconds: 10
  
readinessProbe:
  tcpSocket:
    port: 10911
  initialDelaySeconds: 20
  periodSeconds: 5

监控告警

  • Prometheus Exporter + Grafana Dashboard;
  • 关键告警规则:Broker TPS 突降超过 50%(5 分钟内);消费者堆积超过 10 万条;Consumer 全部离线(心跳丢失);磁盘使用率超过 80%;Broker 组件宕机。
# PrometheusRule 示例
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
  name: rocketmq-alerts
spec:
  groups:
  - name: rocketmq
    rules:
    - alert: RocketMQConsumerLagHigh
      expr: (rocketmq_broker_offset - rocketmq_consumer_offset) > 100000
      for: 5m
      labels:
        severity: warning
      annotations:
        summary: "Consumer Group XQOPEN $labels.consumer_group XQCLOSE 堆积超过 10 万"

安全加固

  • 启用 ACL 基于 AK/SK 认证授权;
  • NetworkPolicy 限制 Broker 仅被指定标签的 Pod 访问;
  • NameServer 不对外暴露,仅集群内访问;
  • Secret 管理 AK/SK。

运维自动化

  • 基于 CronJob 的定期备份(VolumeSnapshot + CommitLog 文件备份);
  • 使用 ArgoCD / Flux 进行 GitOps 管理 CR 声明;
  • Helm Chart 版本化管理组件配置;
  • HPA 自动扩缩容 Consumer 实例;
  • 定期演练主从切换与灾备恢复。
# NetworkPolicy 限制 Broker 访问
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: broker-access
spec:
  podSelector:
    matchLabels:
      app: broker
  policyTypes:
  - Ingress
  ingress:
  - from:
    - podSelector:
        matchExpressions:
        - key: app
          operator: In
          values:
          - proxy
          - name-server
    ports:
    - port: 10911
      protocol: TCP