跳转到内容

Kafka on Kubernetes 面试题

30 道题
分类
中间件
题目数
30 道
已阅读 0 / 30 题
1 Kafka on Kubernetes 的部署方案对比(Strimzi / Confluent Operator / Banzaicloud Koperator)

答案:

Kubernetes 上部署 Kafka 的主流方案包括 Strimzi、Confluent for Kubernetes(CFK)和 Banzaicloud Koperator,三者均通过 Operator 模式实现声明式管理与自动化运维。

维度StrimziConfluent for KubernetesBanzaicloud Koperator
开源协议Apache 2.0(CNCF Sandbox)Confluent Community LicenseApache 2.0
Kafka 发行版Apache KafkaConfluent Platform(含企业特性)Apache Kafka
ZooKeeper 管理Operator 一并管理 ZK 集群Operator 管理 ZK需用户自行管理
KRaft 支持Strimzi 0.34+ 支持 KRaft 模式完整支持不支持
Topic 管理Topic Operator(KafkaTopic CRD)原生集成通过 KafkaTopic CRD
用户管理User Operator(KafkaUser CRD)Confluent RBAC需手动管理
MirrorMaker 2KafkaMirrorMaker2 CRDConfluent Replicator不支持
Connect 管理KafkaConnect CRDConfluent Control Center 集成不支持
Cruise Control内置集成内置集成不支持
Schema Registry需额外部署原生集成不支持
监控JMX Exporter + Prometheus annotationsConfluent Control Center需自行集成
网络TLS/SCRAM-SHA-512/OAuth2/mTLSmTLS/SASL/SSOTLS/SASL
存储PVC/JBOD/Tiered StoragePVCPVC
升级策略支持滚动升级/手动控制支持滚动升级有限支持
适用场景社区版 Kafka,CNCF 生态,中小规模企业合规,Confluent 生态,大规模简单部署场景

Strimzi 优势:CNCF 生态兼容性最佳,CRD 覆盖全生命周期,社区活跃度高。Confluent Operator 优势:企业级功能(RBAC、Control Center、Schema Registry、Tiered Storage)开箱即用,商业支持完善。Banzaicloud Koperator 优势:轻量,部署简单。

2 Strimzi Operator 的架构与核心 CRD

答案:

Strimzi Operator 采用分层架构,通过一组 Kubernetes CRD 描述 Kafka 集群、Topic、用户和数据集成组件,Operator 负责将 CRD 描述转换为实际的 StatefulSet、Deployment、Service 等 Kubernetes 原生资源。

核心 CRD 列表

CRD职责对应 K8s 资源
Kafka定义 Kafka 集群拓扑,含 Broker、ZK/KRaft、存储、网络、认证配置StatefulSet、Service、ConfigMap、PVC
KafkaTopic声明式管理 Topic 的分区数、副本因子、配置无(通过 Kafka Admin API 创建)
KafkaUser定义客户端用户及 ACL 权限Secret(存储凭证)
KafkaConnect部署和管理 Kafka Connect 集群Deployment/StatefulSet、Service
KafkaConnector管理单个 Connector 实例无(通过 Connect REST API)
KafkaMirrorMaker2部署 Mirrormaker 2 跨集群复制Deployment、Service
KafkaBridge部署 HTTP Bridge,支持 HTTP 协议访问 KafkaDeployment、Service
KafkaRebalance触发 Cruise Control 执行分区均衡无(通过 Cruise Control API)
KafkaNodePool定义 Broker 节点池(支持异构 Broker)StatefulSet

架构工作流

Kafka CR 创建 → Cluster Operator Watch → 
  创建 ZK StatefulSet(非 KRaft 模式)→ 
  创建 Broker StatefulSet → 
  Entity Operator(Topic Operator + User Operator)启动 → 
  监听 KafkaTopic / KafkaUser CRD → 
  通过 Admin API 操作 Kafka 集群

Strimzi Operator 控制平面部署在 kafka 命名空间,支持单命名空间或集群级别监听。Cluster Operator 通过 STRIMZI_NAMESPACE 环境变量控制监听范围。

3 Strimzi 的 Kafka 集群 StatefulSet 部署

答案:

Strimzi 将每个 Kafka Broker 和 ZooKeeper 节点映射为 StatefulSet 中的 Pod,利用 StatefulSet 的有序部署、稳定网络标识和持久化存储实现有状态服务的 Kubernetes 化管理。

StatefulSet 关键配置

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3
    version: 3.7.0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: nodeport
        tls: true
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
        - id: 1
          type: persistent-claim
          size: 100Gi
          deleteClaim: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: strimzi.io/name
                      operator: In
                      values:
                        - my-cluster-kafka
                topologyKey: kubernetes.io/hostname
    resources:
      requests:
        memory: 8Gi
        cpu: 2
      limits:
        memory: 16Gi
        cpu: 4

StatefulSet 网络标识:每个 Broker 分配稳定的 DNS 名称 my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local,Broker ID 与 Pod 序号对应。跨集群重启后 Broker ID 保持不变。

PodAntiAffinity 与 Rack Awareness:通过 podAntiAffinity 将不同 Broker Pod 调度到不同 K8s 节点,配合 rack 配置实现机架感知副本分布,确保分区副本跨物理故障域分布。

JBOD 存储:Strimzi 支持 JBOD(Just a Bunch of Disks)配置,每个 Broker 可挂载多个独立 PVC,Kafka 日志目录分布在不同卷上,提升 I/O 并行度。

4 Strimzi 的 Entity Operator(Topic Operator + User Operator)

答案:

Entity Operator 是 Strimzi 中负责 Topic 和 User 生命周期管理的子组件,以 Sidecar 或独立 Deployment 形式运行,通过 Kafka Admin API 与 Kafka 集群交互。

Entity Operator 架构

子组件CRD职责通信方式
Topic OperatorKafkaTopic管理 Topic 创建、分区数、副本因子、配置更新Kafka Admin API
User OperatorKafkaUser管理用户凭证生成、ACL 绑定、配额设置Kafka Admin API
TLS Sidecar管理 TLS 证书轮换与推送文件系统共享卷

Topic Operator 工作流程

KafkaTopic CR 创建/更新 → Topic Operator Watch 事件 → 
  反序列化 CR → 校验分区数/副本因子/配置 → 
  调用 AdminClient.createTopics() / AdminClient.describeConfigs() → 
  对比期望状态与当前状态 → 执行变更 → 更新 KafkaTopic.status

User Operator 工作流程

KafkaUser CR 创建/更新 → User Operator Watch 事件 → 
  生成凭证(SCRAM-SHA-512 密码 / TLS 证书)→ 
  存储到 Secret → 调用 Admin API 创建/更新用户 → 
  绑定 ACL 规则 → 设置 Quota → 更新 KafkaUser.status

Topic Operator 双向同步策略

策略行为
UNIDIRECTIONAL(单向)仅从 KafkaTopic CR → Kafka 集群同步,集群内手工变更不被 CR 感知
BIDIRECTIONAL(双向)KafkaTopic CR ↔ Kafka 集群双向同步,集群内手工变更会更新 CR 状态

常见故障处理:Topic Operator 与 Kafka 集群网络中断时,CR 状态进入 NotReady,连接恢复后自动重建 ZooWatcher 并重新同步。证书过期时 TLS Sidecar 自动触发证书轮换。

5 Kafka Broker 的 Rack Awareness 在 K8s 上的实现

答案:

Rack Awareness 确保 Kafka 分区的不同副本分布在不同的故障域(K8s 节点 / 可用区)上,避免单个节点或 AZ 故障导致数据不可用。

K8s 上实现 Rack Awareness 的三种方式

方式配置方法适用场景
Kubernetes Node Labelrack.topologyKey: topology.kubernetes.io/zone多 AZ 集群
手动指定 Rack IDtemplate.pod.topologySpreadConstraints精细化控制
Pod Anti-Affinityaffinity.podAntiAffinity确保不同 Broker 在不同 Node

Strimzi Rack Awareness 配置

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3
    rack:
      topologyKey: topology.kubernetes.io/zone
    listeners:
      - name: plain
        port: 9092
        type: internal
    template:
      pod:
        affinity:
          podAntiAffinity:
            preferredDuringSchedulingIgnoredDuringExecution:
              - weight: 100
                podAffinityTerm:
                  labelSelector:
                    matchExpressions:
                      - key: strimzi.io/name
                        operator: In
                        values:
                          - my-cluster-kafka
                  topologyKey: kubernetes.io/hostname

工作原理:Strimzi 读取每个 Broker Pod 所在 K8s 节点的 topology.kubernetes.io/zone Label,将其注入 Kafka 的 broker.rack 配置。Kafka Controller 在分区分配时确保同一分区的不同副本落在不同 AZ 的 Broker 上。

Pod Topology Spread Constraints 实现细粒度分布

template:
  pod:
    topologySpreadConstraints:
      - maxSkew: 1
        topologyKey: topology.kubernetes.io/zone
        whenUnsatisfiable: DoNotSchedule
        labelSelector:
          matchLabels:
            strimzi.io/name: my-cluster-kafka
      - maxSkew: 1
        topologyKey: kubernetes.io/hostname
        whenUnsatisfiable: DoNotSchedule
        labelSelector:
          matchLabels:
            strimzi.io/name: my-cluster-kafka

Rack Awareness 与分区副本分布规则

  • 分区副本分布在 min(rackCount, replicationFactor) 个机架上。
  • ISO(In-Sync Replica)集合优先跨 Rack 分布。
  • Rack 级故障时,剩余 Rack 上的 Follower 副本自动选举为新 Leader。
6 Kafka Topic 的分区与副本配置

答案:

Kafka Topic 的分区和副本配置直接决定数据分布、并行消费能力和容错能力。

KafkaTopic CRD 示例

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: orders-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 604800000        # 7 天保留
    retention.bytes: 107374182400  # 100GB 保留
    segment.bytes: 1073741824      # 1GB 每段
    min.insync.replicas: 2
    cleanup.policy: delete
    compression.type: producer
    max.message.bytes: 10485760    # 10MB
    message.timestamp.type: CreateTime

分区数设计原则

场景建议分区数依据
低吞吐(< 1 MB/s)3-6最小并行度
中吞吐(1-50 MB/s)6-12平衡并行度与开销
高吞吐(50-200 MB/s)12-30充分利用并行消费
超高吞吐(> 200 MB/s)30-100线性扩展

分区数约束:分区数不可减少(Kafka 自身限制),生产环境建议预留 20%-30% 增长空间。过多分区增加文件句柄开销和 Controller 选举耗时。

副本因子配置

副本因子容错能力适用场景
10 节点故障开发环境
21 节点故障非关键业务
32 节点故障(配合 min.insync.replicas=2)生产标准
4-53+ 节点故障金融级高可用

存储容量估算公式

单分区日均存储 = 日均消息数 × 平均消息大小 × 副本因子 × (1 + segment索引开销≈10%) × 保留天数
总存储 = 单分区存储 × 分区数 + 预留 30% buffer
7 Kafka 的 ISR(In-Sync Replica)机制与最小 ISR

答案:

ISR 是 Kafka 高可用和数据一致性的核心机制,定义了与 Leader 保持同步的 Follower 副本集合。

ISR 判断条件

Follower 进入 ISR 条件:
  - Follower 在 replica.lag.time.max.ms(默认 30s)内向 Leader 发送过 Fetch 请求
  - Follower 的 LEO(Log End Offset)与 Leader 的差异在 replica.lag.max.messages 范围内

Follower 踢出 ISR 条件:
  - 超过 replica.lag.time.max.ms 未向 Leader 发起 Fetch 请求
  - Follower LEO 落后 Leader LEO 超过阈值

ISR 收缩与扩展流程

Leader 维护 ISR 集合 → 
  Follower Fetch 请求到达 → 更新 lastCaughtUpTimeMs → 
  定期检查:now - lastCaughtUpTimeMs > replica.lag.time.max.ms → 
  从 ISR 移除 → 写入 ZooKeeper / KRaft → 
  通知 Controller 更新 ISR 元数据 → Propagate 至所有 Broker → 
  Follower 追上后重新加入 ISR

min.insync.replicas 配置

minISR容错含义风险
1Leader 单独在 ISR 即可写入数据丢失,Leader 故障后数据不可恢复
2(推荐)至少 Leader + 1 个 Follower 在 ISR 才能写入可容忍 1 个 ISR 节点故障
replicas = 3, minISR = 3所有副本均需在 ISR任一节点故障中断写入

写入一致性保障

Producer acks=all + min.insync.replicas=2 + replicationFactor=3
  → 消息写入 Leader → Leader 等待 Follower 确认 → 
  至少 2 个副本(含 Leader)持久化成功 → Producer 收到 ACK

ISR 抖动处理:broker 短暂 GC pause 或网络抖动导致 Follower 频繁进出 ISR,表现为 UnderMinIsr 指标波动。通过调大 replica.lag.time.max.ms(如 60s)和确保 Broker 有充足的内存避免 GC 过频来缓解。

8 Kafka 的 Leader 选举与 Controller Broker

答案:

Controller 是 Kafka 集群中负责管理分区和副本状态的 Broker,通过 ZooKeeper(或 KRaft 中的 Metadata Quorum)实现选举。

Controller 职责

职责说明
分区 Leader 选举Broker 宕机时为受影响分区选举新 Leader
ISR 管理维护每个分区的 ISR 集合变更
Topic 管理Topic 创建、删除、分区扩展
副本分配新分区创建时的副本分布策略
元数据广播向所有 Broker 广播集群元数据变更
Preferred Leader 选举触发和完成 Preferred Replica Leader 选举

Controller 选举流程(ZooKeeper 模式):

Broker 启动 → 尝试创建 /controller 临时节点 → 
  创建成功 → 成为 Controller,监听 ZooKeeper 变更 → 
  创建失败 → 监听 /controller 节点 → 
  当前 Controller 宕机 → ZooKeeper 临时节点删除 → 
  剩余 Broker 收到 Watch 通知 → 重新竞争创建 /controller → 
  新 Controller 当选

Controller 分区 Leader 选举策略

策略行为
OfflinePartitionLeaderElectionStrategy仅当没有 Leader 时选举
PreferedReplicaLeaderElectionStrategy优先选举 Preferred Replica 为 Leader
NoOpLeaderElectionStrategy不执行自动选举
Broker 宕机检测 → ZooKeeper 临时节点超时 → 
Controller 收到 Broker 变更通知 → 
遍历该 Broker 所有 Leader 分区 → 
从 ISR 中选取第一个存活 Broker 作为新 Leader → 
更新 ZooKeeper /brokers/topics/<topic>/partitions/<pid>/state → 
向所有 Broker 广播 LeaderAndIsr 请求 → 
各 Broker 更新本地元数据缓存

Controller 故障影响与优化

  • Controller 切换期间(数秒内),分区 Leader 选举、Topic 创建/删除等管理操作中断。
  • controlled.shutdown.enable=true 时 Broker 正常下线由 Controller 优雅迁移分区,避免 Leader 批量切换。
  • KRaft 模式下 Controller 与 Broker 角色分离,Controller 独立为 Quorum 控制器节点。
9 Kafka 的 Exactly-Once 语义与事务

答案:

Kafka 的 Exactly-Once 语义(EOS)通过幂等生产者和事务机制实现消息不丢失、不重复的处理保证。

三种消息传递语义

语义机制实现
At-Most-Onceacks=0,发送即忘可能丢消息
At-Least-Onceacks=1/all,重试机制可能重复
Exactly-Once幂等 + 事务不丢不重复

幂等生产者(Idempotent Producer)

Producer 初始化 → 向 Broker 请求 Producer ID(PID)→ 
  Broker 分配 PID + Epoch → Producer 每条消息附带:
    (PID, Epoch, SequenceNumber) → 
  Broker 按 (PID, Partition) 维护已提交的最大 SequenceNumber → 
  收到消息 → 检查:
    Sequence == lastSeq+1 → 接受
    Sequence <= lastSeq → 重复,丢弃,返回 ACK
    Sequence > lastSeq+1 → 乱序,抛出 OutOfOrderSequenceException
# 幂等生产者配置
enable.idempotence=true
acks=all
max.in.flight.requests.per.connection=5
retries=2147483647

事务(Transactions)

Producer 初始化事务 → initTransactions() → 
  Broker 分配 TransactionalId → 
  beginTransaction() → 
  发送消息(附加 TransactionalId)→ 
  消息写入分区但不向消费者可见 → 
  sendOffsetsToTransaction() → 
  commitTransaction() / abortTransaction() → 
  Broker 写入 Commit/Abort Marker → 
  消费者读取到 Marker 后过滤事务消息

EOS Consumer 配合isolation.level=read_committed,仅读取已提交的事务消息,过滤未提交和已中止的事务消息。

EOS 性能代价:事务写入增加约 20%-30% 延迟,提高 Producer 内存占用,transaction.state.log 内部 Topic 产生额外网络和磁盘 I/O。

10 Kafka 的消息压缩(Snappy / LZ4 / GZip / Zstd)

答案:

Kafka 支持 Producer 端压缩后 Broker 原样存储的压缩策略,减少网络传输和磁盘占用。

压缩算法对比

算法压缩率压缩速度解压速度适用场景
Snappy中等(2x)极快极快低延迟场景,平衡选择
LZ4中等(2x)最快最快低延迟、高吞吐场景首选
GZip高(4x-5x)中等存储密集型、冷数据归档
Zstd最高(4x-6x)中等高压缩率需求,推荐生产使用
uncompressed1x无开销无开销实时性要求极高、消息本身已压缩

压缩配置层级

# Producer 端压缩(推荐)
compression.type=zstd          # none / gzip / snappy / lz4 / zstd

# Broker 端压缩策略
compression.type=producer      # producer(保留原始压缩) / uncompressed(强制解压)

# Topic 级别压缩覆盖
# 通过 KafkaTopic CRD 的 config 指定

压缩在 Broker 端的处理

Producer 压缩消息 → 封装为 MessageSet → 发送到 Broker → 
  Broker.compression.type=producer → 原样写入 Page Cache → 
  批量刷盘到日志段文件 → 
  Consumer Fetch 请求 → Broker 直接发送压缩数据 → 
  Consumer 解压消息集

Strimzi 中的压缩配置

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    config:
      compression.type: producer       # Broker 端保持 Producer 压缩
      message.max.bytes: 10485760      # 单条消息最大 10MB

Zstd 生产推荐理由:压缩率最高(减少磁盘和网络成本),解压速度快(对 Consumer 几乎无感知),支持字典压缩进一步提升小消息压缩率。

11 Kafka 的日志保留策略(Time / Size / Compaction)

答案:

Kafka 的日志保留策略通过 cleanup.policy 和配套参数控制消息在 Broker 上的生命周期。

三种清理策略

策略cleanup.policy触发条件适用场景
基于时间删除deleteretention.ms / retention.minutes / retention.hours日志、事件流、时序数据
基于大小删除deleteretention.bytes有限存储空间
日志压缩compactmin.cleanable.dirty.ratioKV 状态存储、CDC 数据、配置快照

Strimzi KafkaTopic 保留策略配置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: events-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    # 基于时间 + 大小双重删除
    cleanup.policy: delete
    retention.ms: 604800000          # 7 天
    retention.bytes: 107374182400    # 100GB / 分区

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: user-state-compacted
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 6
  replicas: 3
  config:
    cleanup.policy: compact
    min.cleanable.dirty.ratio: 0.5
    delete.retention.ms: 86400000    # 删除标记保留 1 天
    segment.ms: 86400000             # 每天产生新段

日志清理时机与机制

Log Cleaner 线程定期扫描 → 选择脏比率最高的 Log → 
  逐段读取 → 构建 Offset Map(每个 Key 的最新 Offset)→ 
  保留最新 Offset 的消息,删除旧消息 → 合并为干净的新段 → 
  交换新旧段 → 更新索引

保留策略参数优先级

优先级规则
最高已确定为 Active Segment 的日志段不删除
min.compaction.lag.ms / max.compaction.lag.ms 内的段不参与压缩
retention.msretention.bytes 同时设置时先触发的优先生效
Broker 级别 log.retention.* 作为默认值,Topic 级别配置覆盖

生产环境保留策略最佳实践

数据类型推荐策略保留周期原因
业务事件流delete + time7-30 天事件有时效性
状态快照(CDC)compact永久仅需每个 Key 的最新值
告警事件delete + size100GB存储空间受限
审计日志delete + time90 天+合规要求
12 Kafka 的消费者组(Consumer Group)Rebalance 机制

答案:

Consumer Group Rebalance 是 Kafka 在 Consumer 成员变化时重新分配分区所有权的协调过程。

触发 Rebalance 的条件

触发条件说明
Consumer 加入或离开 GroupConsumer 启动、正常退出、崩溃(Session 超时)
Topic 分区数增加新增分区需分配给 Consumer
订阅的 Topic 变更Consumer 通过正则订阅匹配到新 Topic
max.poll.interval.ms 超时Consumer 两次 poll 间隔超过阈值

Rebalance 协议阶段

Phase 1: JoinGroup
  Consumer → Group Coordinator: JoinGroupRequest(memberId, protocols)
  Coordinator 选第一个 Consumer 为 Group Leader
  Coordinator → Group Leader: JoinGroupResponse(所有 Consumer 信息)
  Coordinator → 其他 Consumer: JoinGroupResponse(成员 ID)

Phase 2: SyncGroup  
  Group Leader 执行分配策略 → 生成 Partition Assignments → 
  Group Leader → Coordinator: SyncGroupRequest(分配方案)
  Coordinator → 所有 Consumer: SyncGroupResponse(各 Consumer 的分区列表)

Consumer 收到分配方案 → 清空或保持未提交的 Offset → 开始消费

Rebalance 策略对比

策略算法适用场景
RangeAssignor按 Topic 分区范围连续分配默认策略,平衡性一般
RoundRobinAssignor所有分区轮询均匀分配Consumer 订阅 Topic 完全相同
StickyAssignor尽量保持原有分配,减少分区迁移生产推荐,减少无意义迁移
CooperativeStickyAssignor增量式 Rebalance,不触发 Stop-The-World生产首选(Kafka 2.4+)

Cooperative Rebalance(增量式重平衡)

与传统 Eager Rebalance 差异:
  - 不触发全局 Stop-The-World
  - Consumer 不必在 Rebalance 期间暂停所有分区消费
  - 仅迁移需要重新分配的分区
  - 未被重新分配的分区继续正常消费

Consumer 配置:
  partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

Rebalance 优化措施

# 延长 Session 超时,避免短暂网络波动触发 Rebalance
session.timeout.ms=30000          # Consumer 心跳超时(默认 45s)

# 延长心跳间隔,减少心跳开销
heartbeat.interval.ms=3000        # Consumer 向 Coordinator 发送心跳间隔

# 增加 Poll 间隔上限,防止慢速处理触发 Rebalance
max.poll.interval.ms=600000       # 两次 poll 最大间隔(默认 5min)

# 使用 Cooperative Sticky Rebalance
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

# 增加 poll 记录数,减少 poll 调用频率
max.poll.records=500
13 Kafka 的 Group Coordinator 与 Offset 管理

答案:

Group Coordinator 是 Broker 端管理 Consumer Group 的核心组件,负责 Consumer 成员管理、Offset 提交和 Rebalance 协调。

Coordinator 定位机制

Consumer Group ID → 
  hash(groupId) % __consumer_offsets 分区数 → 目标分区 → 
  该分区 Leader 所在 Broker = Group Coordinator

Offset 存储架构(Kafka 0.9+)

旧架构(ZooKeeper 存储 Offset):
  Consumer → ZK 直接写入 Offset
  缺点:ZK 不适合高频写入,OOM 风险

新架构(__consumer_offsets Topic 存储 Offset):
  Consumer → Group Coordinator → __consumer_offsets Topic
  存储格式:Key = <GroupId, Topic, Partition>
           Value = <Offset, Metadata, Timestamp>
  压缩策略:compact(每个分区仅保留最新 Offset)

Offset 提交模式

模式配置行为风险
自动提交enable.auto.commit=trueauto.commit.interval.ms 周期提交Consumer 故障后可能重复消费
同步手动提交consumer.commitSync()阻塞等待提交完成降低吞吐
异步手动提交consumer.commitAsync()非阻塞提交,回调处理结果网络异常时可能丢提交
事务内提交EOS 模式Offset 与消息在同一事务中原子提交仅事务 Producer+Consumer 场景

Coordinator 故障处理

Consumer 心跳超时 → Group Coordinator 移除该 Consumer → 
  触发 Rebalance → 但 Coordinator 自身可能也在故障 → 
  Consumer 收到 NOT_COORDINATOR 错误 → 
  重新 FindCoordinator Request → 定位新 Coordinator → 
  重新 JoinGroup

Consumer Offset 重置策略

策略行为
latest(默认)从最新位置开始(忽略历史消息)
earliest从最早可用偏移量开始(重消费历史)
none未找到 Offset 时抛出异常

Group Coordinator 在 KRaft 模式下的变化:Coordinator 由 KRaft Quorum 中的活跃 Controller 承担,Offset 仍存储在 __consumer_offsets Topic 中。

14 Kafka 的监控指标(JMX Exporter + Prometheus + Grafana)

答案:

Kafka on Kubernetes 的监控体系以 JMX 指标采集为基础,通过 JMX Exporter 转换为 Prometheus 格式,由 Grafana 可视化展示。

Strimzi 监控配置

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
    template:
      pod:
        metadata:
          annotations:
            prometheus.io/scrape: "true"
            prometheus.io/port: "9404"

JMX Exporter 配置(kafka-metrics-config.yml)

lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
  # Broker 核心指标
  - pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec)><>OneMinuteRate
    name: kafka_server_broker_topic_metrics_$1
    type: GAUGE
  - pattern: kafka.server<type=BrokerTopicMetrics, name=(MessagesInPerSec)><>OneMinuteRate
    name: kafka_server_broker_topic_metrics_$1
    type: GAUGE
  # 网络请求
  - pattern: kafka.network<type=RequestMetrics, name=(TotalTimeMs|RequestQueueTimeMs), request=(Produce|Fetch|FetchConsumer|FetchFollower)><>Count
    name: kafka_network_request_metrics_$2_$1
    labels:
      request: "$3"
  # ISR 与副本
  - pattern: kafka.server<type=ReplicaManager, name=(UnderReplicatedPartitions|UnderMinIsrPartitionCount)><>Value
    name: kafka_server_replica_manager_$1
  - pattern: kafka.server<type=ReplicaManager, name=(IsrShrinksPerSec|IsrExpandsPerSec)><>OneMinuteRate
    name: kafka_server_replica_manager_$1
  # Producer 请求延迟
  - pattern: kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer)><>(\w+)
    name: kafka_network_$3_$1_$2
    labels:
      request: "$3"

Prometheus ServiceMonitor

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: kafka-cluster-metrics
  labels:
    release: prometheus
spec:
  selector:
    matchLabels:
      strimzi.io/cluster: my-cluster
      strimzi.io/kind: Kafka
  endpoints:
    - port: tcp-prometheus
      interval: 15s
      scrapeTimeout: 10s

关键监控指标分类与告警阈值

类别关键指标告警阈值
吞吐kafka_server_broker_topic_metrics_bytesinpersec> 80% 磁盘带宽上限
延迟kafka_network_request_metrics_totaltimems_produceP99 > 100ms
ISRkafka_server_replica_manager_underreplicatedpartitions> 0 持续 5 分钟
消费滞后kafka_consumergroup_group_lag> 10000 或增长率 > 1000/min
磁盘使用kafka_log_log_size> 80% 磁盘容量
Controllerkafka_controller_controllerstate不等于 1(Broker 不是 Controller 但值 = 0 OK)
网络kafka_network_processor_idle_percent< 0.3 持续 10 分钟
GCjvm_gc_pause_secondsP99 > 500ms

Grafana Dashboard 分层

  • Overview Dashboard:集群吞吐、延迟、活跃 Controller、Broker 数量
  • Broker Dashboard:单 Broker 的请求速率、网络 I/O、磁盘 I/O、GC 行为
  • Topic/Partition Dashboard:分区级别读写速率、大小、ISR 状态
  • Consumer Group Dashboard:消费滞后、提交速率、Rebalance 事件
15 Kafka 的性能调优(Producer / Consumer / Broker 参数)

答案:

Kafka 性能调优需从 Producer、Broker、Consumer 三个维度分别优化。

Producer 端调优

# 吞吐优先
batch.size=131072                  # 批量大小 128KB
linger.ms=5                        # 等待 5ms 攒批
compression.type=zstd              # 压缩算法
buffer.memory=134217728            # 发送缓冲区 128MB
max.in.flight.requests.per.connection=5
acks=1                             # Leader 确认即可

# 可靠性优先
acks=all                           # 所有 ISR 确认
enable.idempotence=true            # 幂等生产者
max.in.flight.requests.per.connection=5
retries=2147483647                 # 无限重试
delivery.timeout.ms=120000         # 超时 2 分钟

Broker 端调优

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
  kafka:
    config:
      # 网络与 I/O
      num.network.threads: 8       # 网络线程数(处理网络请求)
      num.io.threads: 16           # I/O 线程数(处理磁盘 I/O)
      socket.send.buffer.bytes: 1048576
      socket.receive.buffer.bytes: 1048576
      socket.request.max.bytes: 104857600

      # Page Cache 与刷盘
      log.flush.interval.messages: 9223372036854775807  # 不基于消息数刷盘
      log.flush.interval.ms: 9223372036854775807        # 不基于时间刷盘(依赖 OS)

      # 日志段与索引
      log.segment.bytes: 1073741824                     # 1GB 段
      log.index.interval.bytes: 4096                   # 稀疏索引密度

      # 副本机制
      num.replica.fetchers: 4       # Follower 拉取线程数
      replica.fetch.max.bytes: 10485760
      replica.socket.timeout.ms: 30000

      # 压缩
      compression.type: producer

      # Leader 均衡
      auto.leader.rebalance.enable: true
      leader.imbalance.per.broker.percentage: 10
      leader.imbalance.check.interval.seconds: 300

    resources:
      requests:
        memory: 8Gi
        cpu: 2
      limits:
        memory: 16Gi
        cpu: 4
    jvmOptions:
      -Xms: 6g
      -Xmx: 6g
      -XX:+UseG1GC
      -XX:MaxGCPauseMillis: 20
      -XX:InitiatingHeapOccupancyPercent: 35
      -XX:+DisableExplicitGC

Consumer 端调优

# 吞吐优先
fetch.min.bytes=1048576            # 每次 Fetch 最少拉取 1MB
fetch.max.wait.ms=500              # 等待 500ms 攒批
max.partition.fetch.bytes=10485760 # 每分区最大 10MB
max.poll.records=500               # 每次 Poll 最多 500 条

# 延迟优先
fetch.min.bytes=1                  # 有数据就返回
fetch.max.wait.ms=50               # 最多等待 50ms

# Rebalance 优化
session.timeout.ms=30000
heartbeat.interval.ms=3000
max.poll.interval.ms=600000
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

# Offset 管理
enable.auto.commit=false           # 手动提交,避免自动提交带来的重复消费

JVM 与 OS 调优

调优项配置说明
Page Cachevm.dirty_background_ratio=5后台刷脏页阈值
文件描述符ulimit -n 100000Kafka 大量文件需求
Swapvm.swappiness=1最小化 Swap
磁盘调度noop / noneSSD 无 I/O 调度器
网络调优net.core.rmem_max=134217728套接字接收缓冲区
16 Kafka 的存储配置(PVC / JBOD / Tiered Storage)

答案:

Strimzi 支持普通 PVC、JBOD 和 Tiered Storage 三种存储模式。

单 PVC 存储

spec:
  kafka:
    storage:
      type: persistent-claim
      size: 500Gi
      class: fast-ssd
      deleteClaim: false

JBOD 存储

spec:
  kafka:
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 200Gi
          class: fast-ssd
          deleteClaim: false
        - id: 1
          type: persistent-claim
          size: 200Gi
          class: fast-ssd
          deleteClaim: false

JBOD 与单 PVC 对比

维度单 PVCJBOD
I/O 并行度单卷串行多卷并行
容量扩展需扩容已有 PVC可添加新卷
日志目录分布单目录多目录轮询写入
磁盘故障隔离全局故障单卷故障不影响其他卷
成本高(多块磁盘)

Tiered Storage(Strimzi 0.39+,Kafka 3.6+ 特性):

spec:
  kafka:
    config:
      # 启用 Tiered Storage
      remote.log.storage.system.enable: true
      remote.log.storage.manager.class.path: /opt/kafka/libs/remote-storage-*.jar
      remote.log.metadata.manager.impl.class: org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
      rlmm.config.remote.log.metadata.topic.replication.factor: 3

      # 本地保留
      local.retention.ms: 86400000  # 本地保留 1 天
      local.retention.bytes: -2     # 本地无大小限制

      # 远程保留
      retention.ms: 2592000000      # 远程保留 30 天

Tiered Storage 架构

Active Segments(本地 SSD) <── 热数据,最近 1 天
Closed Segments(远程 S3/MinIO) <── 冷数据,最近 30 天
Deleted Segments <── 过期数据,由 Log Cleaner 线程清除
  • 优势:降低本地存储成本,冷数据使用对象存储,支持历史数据回溯。
  • 限制:需要 S3 兼容对象存储(MinIO / Ceph RGW / AWS S3),性能会受远程读取延迟影响。

K8s StorageClass 选择

StorageClass性能适用场景
fast-ssd(NVMe SSD)极高 IOPS高吞吐、低延迟生产环境
ssd(SATA SSD)高 IOPS标准生产环境
hdd低 IOPS冷数据、大规模历史存储
17 Kafka 的 SSL / TLS 加密与 SASL 认证

答案:

Strimzi 通过 Listener 配置同时支持 TLS 传输加密和多种 SASL 认证机制,管理客户端与 Broker 之间以及 Broker 之间的通信安全。

Listener 类型与安全协议

Listener Type安全协议用途
internal + tls: trueTLS 传输加密Broker 间通信
internal + SASLTLS + SASL 认证客户端连接认证
external + tls: trueTLS外部客户端加密连接
external + SASLTLS + SASL外部客户端认证连接

TLS 加密通信配置

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    listeners:
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: true
        authentication:
          type: tls
        configuration:
          bootstrap:
            nodePort: 32094
          brokers:
            - broker: 0
              nodePort: 32095
            - broker: 1
              nodePort: 32096
            - broker: 2
              nodePort: 32097
    authorization:
      type: simple

SASL 认证机制对比

认证类型凭证存储密码安全配置复杂度性能影响
SCRAM-SHA-512ZooKeeper / KRaft加密哈希高(需密码哈希)
TLS 客户端证书Kubernetes Secret极高
OAuth 2.0OAuth Provider
PlainZooKeeper / KRaft明文极低最低

KafkaUser SCRAM-SHA-512 配置

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operations:
          - Read
          - Write
          - Describe
        host: "*"
      - resource:
          type: group
          name: my-consumer-group
          patternType: literal
        operations:
          - Read
        host: "*"

客户端连接配置(SCRAM-SHA-512)

bootstrap.servers=my-cluster-kafka-tls-bootstrap.kafka.svc:9093
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="my-user" \
  password="<password-from-secret>";
ssl.truststore.location=/path/to/ca.crt
ssl.truststore.type=PEM

TLS 证书管理:Strimzi 使用 Cluster Operator 内置 CA 自动签发和管理证书。CA 证书和 Broker 证书存储在 Kubernetes Secret 中,证书过期前自动轮换。也可对接外部 CA(cert-manager)签发证书。

Listener 安全策略矩阵

plain (9092):   No TLS + No Auth → 仅测试环境
tls (9093):     TLS + No Auth → 加密但不认证
tls (9093):     TLS + SASL → 加密 + 用户认证(生产推荐)
external (9094): TLS + SASL → 外部客户端安全接入
18 Kafka 的 ACL / RBAC 权限控制

答案:

Strimzi 通过 KafkaUser CRD 和 Simple Authorization 实现细粒度的 ACL 访问控制。

KafkaUser ACL 定义

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: app-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      # 主题读写权限
      - resource:
          type: topic
          name: orders-*
          patternType: prefix
        operations:
          - Read
          - Write
          - Describe
          - DescribeConfigs
        host: "*"

      # 消费者组权限
      - resource:
          type: group
          name: order-processor
          patternType: literal
        operations:
          - Read
        host: "*"

      # 集群操作权限
      - resource:
          type: cluster
          name: kafka-cluster
        operations:
          - Describe
          - DescribeConfigs

      # 事务 ID 权限
      - resource:
          type: transactionalId
          name: app-tx-*
          patternType: prefix
        operations:
          - Write
          - Describe

ACL 资源类型与操作映射

资源类型常用操作说明
topicRead / Write / Describe / DescribeConfigs / Alter / AlterConfigs / Create / DeleteTopic 级别控制
groupRead / DescribeConsumer Group 控制
clusterDescribe / DescribeConfigs / Alter / IdempotentWrite / Create集群级别控制
transactionalIdWrite / Describe事务 Producer 控制

ACL Pattern 匹配

PatternType示例匹配
literalorders-topic精确匹配 orders-topic
prefixorders-匹配 orders-topicorders-dlq

Super User 配置:不受 ACL 限制,用于内部组件(Entity Operator、Cruise Control、MirrorMaker 2)。

spec:
  kafka:
    authorization:
      type: simple
      superUsers:
        - CN=entity-operator
        - CN=cruise-control
        - CN=my-mirror-maker-2

ACL 管理技巧

  • KafkaUser 为单元,每个微服务应用一个 KafkaUser 资源。
  • 使用 patternType: prefix 简化多 Topic 权限管理。
  • host 字段设为 * 表示不限制 IP,生产环境建议限定 Pod CIDR。
  • ACL 修改后 User Operator 通过 Admin API 同步到 Kafka,延迟通常在数秒内。
19 Kafka 的 MirrorMaker 2 跨集群复制

答案:

MirrorMaker 2(MM2)基于 Kafka Connect 框架实现,支持集群间的 Topic、Consumer Group Offset 和 ACL 复制。

KafkaMirrorMaker2 CRD

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: dr-mirror
spec:
  version: 3.7.0
  replicas: 2
  connectCluster: target-cluster
  clusters:
    - alias: source-cluster
      bootstrapServers: source-kafka-bootstrap.source-ns.svc:9092
    - alias: target-cluster
      bootstrapServers: target-kafka-bootstrap.target-ns.svc:9092
  mirrors:
    - sourceCluster: source-cluster
      targetCluster: target-cluster
      sourceConnector:
        config:
          replication.factor: 3
          offset-syncs.topic.replication.factor: 3
          sync.topic.acls.enabled: true
          refresh.topics.enabled: true
          refresh.topics.interval.seconds: 60
          tasks.max: 4
      topicsPattern: "orders.*|payments.*|inventory.*"
      groupsPattern: "order-.*|payment-.*"
      checkpointConnector:
        config:
          checkpoints.topic.replication.factor: 3
          refresh.groups.enabled: true
          refresh.groups.interval.seconds: 60
          sync.group.offsets.enabled: true
          emit.checkpoints.enabled: true
          tasks.max: 2
      heartbeatConnector:
        config:
          heartbeats.topic.replication.factor: 3
          tasks.max: 1

MM2 核心 Connector

Connector功能
MirrorSourceConnector从源集群消费消息,写入目标集群(Topic 名加 source-cluster. 前缀)
MirrorCheckpointConnector同步 Consumer Group Offset(从源集群到目标集群)
MirrorHeartbeatConnector生成心跳消息,监控复制链路的健康状况

MM2 复制语义

源集群 Topic: orders
目标集群 Topic: source-cluster.orders

复制逻辑:
  MM2 从源集群消费 orders → 写入目标集群 source-cluster.orders →
  依赖 Kafka Connect 的 Exactly-Once 支持保证不丟不重 →
  心跳 Topic(source-cluster.heartbeats)持续写入监控延迟 →
  Checkpoint Topic 记录 Consumer Group Offset 映射

MM2 部署架构

[Source DC]
  Kafka Cluster (source-cluster)
         |
  MM2 Workers (active-standby)
         |
[Target DC]
  Kafka Cluster (target-cluster)
   - source-cluster.orders
   - source-cluster.payments
   - offsets synced

生产关注事项

  • MM2 Worker 建议部署在目标集群侧,减少跨 DC 延迟对写入的影响。
  • tasks.max 按源集群的分区总数合理设置,单个 Task 处理的分区数不超过 20。
  • Offset 同步间隔(emit.checkpoints.interval.seconds)按 RPO 目标设置,典型值 60s。
  • MM2 对网络带宽要求高,需确保跨 DC 链路带宽足够(估算:源集群写入吞吐 x 压缩率)。
20 Kafka 的 Cruise Control 自动均衡

答案:

Cruise Control 是 Kafka 集群的自动负载均衡器,通过分析 Broker 的资源利用率和副本分布,自动生成并执行分区重分配方案。

Strimzi 集成 Cruise Control

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  cruiseControl:
    config:
      goals: >-
        com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal        
      capacity.configFile: /opt/cruise-control/config/capacity.json
      hard.goals: >-
        com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal        
      anomaly.detection.goals: >-
        com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal        
    resources:
      requests:
        memory: 1Gi
        cpu: 500m
      limits:
        memory: 2Gi
        cpu: 1

KafkaRebalance CRD 触发均衡

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  labels:
    strimzi.io/cluster: my-cluster
spec:
  goals:
    - RackAwareGoal
    - ReplicaCapacityGoal
    - DiskCapacityGoal
    - NetworkInboundCapacityGoal
    - NetworkOutboundCapacityGoal
    - CpuCapacityGoal
  skipHardGoalCheck: false

Cruise Control 工作流程

KafkaRebalance CR 创建 → Cruise Control 收到请求 →
  Step 1: 收集集群指标(CPU / 磁盘 / 网络 / 副本数)→
  Step 2: 构建负载模型 → 
  Step 3: 运行优化器(Goal-based)→ 生成分区重分配方案 →
  Step 4: 更新 KafkaRebalance.status(ProposalReady)→
  用户审批方案 → Approve → 
  Step 5: 执行分区重分配(Kafka Admin API)→ 
  监控进度 → 完成更新 KafkaRebalance.status(Ready)

优化目标(Goals)分类

目标类型说明
RackAwareGoalHard分区副本跨 Rack 分布,必须满足
ReplicaCapacityGoalSoft副本数不超过 Broker 容量
DiskCapacityGoalSoft磁盘使用率跨 Broker 均衡
CpuCapacityGoalSoftCPU 使用率跨 Broker 均衡
NetworkInboundCapacityGoalSoft入站网络流量均衡
NetworkOutboundCapacityGoalSoft出站网络流量均衡
LeaderBytesInDistributionGoalSoftLeader 写入流量均衡
ReplicaDistributionGoalSoft分区副本数跨 Broker 均衡
TopicReplicaDistributionGoalSoft同一 Topic 副本跨 Broker 均匀分布

手动审批模式

# 等待方案生成
kubectl wait kafkarebalance my-rebalance --for=condition=ProposalReady --timeout=300s

# 审批并执行
kubectl annotate kafkarebalance my-rebalance strimzi.io/rebalance=approve

# 停止正在执行的 Rebalance
kubectl annotate kafkarebalance my-rebalance strimzi.io/rebalance=stop

异常检测(Anomaly Detector):自动检测 Broker 故障、磁盘接近写满、指标异常波动等场景,自动触发 Rebalance 或告警。

21 Kafka 的备份与灾难恢复

答案:

Kafka on Kubernetes 的备份与灾难恢复覆盖数据层、配置层和元数据层。

备份层级

层级内容工具频率
Kafka 数据Topic 消息数据Velero / Strimzi Drain Cleaner / S3 Tiered Storage持续 / 增量
Kafka 配置CRD 资源定义(Kafka/KafkaTopic/KafkaUser 等)kubectl / Velero每次变更后
Consumer GroupOffset 数据MirrorMaker 2 / 内置 Offset Topic近实时
ZooKeeper元数据Velero / ZooKeeper Snapshot定时
证书TLS 证书和 CAcert-manager / Secret 备份每次轮换后

Velero 备份方案

apiVersion: velero.io/v1
kind: Backup
metadata:
  name: kafka-full-backup
  namespace: velero
spec:
  includedNamespaces:
    - kafka
  labelSelector:
    matchLabels:
      strimzi.io/cluster: my-cluster
  includeClusterResources: true
  storageLocation: default
  snapshotVolumes: true
  hooks:
    resources:
      - name: pause-kafka
        pre:
          exec:
            container: kafka
            command:
              - /bin/bash
              - -c
              - |
                # 触发刷盘后暂停写入
                /opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 \
                  --entity-type brokers --entity-default \
                  --alter --add-config unclean.leader.election.enable=false                

Velero 恢复流程

1. 准备恢复目标 K8s 集群(确保 StorageClass 兼容)
2. 创建 Velero Restore CR
   velero restore create --from-backup kafka-full-backup
3. Velero 恢复 PVC 和 PV 数据
4. Strimzi Operator 恢复 StatefulSet / ConfigMap / Secret
5. Kafka Pod 启动,加载恢复的数据
6. 验证集群状态
   - kubectl exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --list
   - 检查 ISR 和 Under Replicated Partitions
7. 恢复 Consumer 从最近的 Committed Offset 开始消费

跨集群灾难恢复(Active-Standby)

[Active DC]                        [Standby DC]
Kafka Cluster ──MM2──→ Kafka Cluster
     │                              │
Clients Produce               Clients Consume
(主写入)                    (DR 切换后接管)

切换流程:
1. 停止 Active 侧 Producer
2. 确认 MM2 同步延迟 = 0(无数据丢失)
3. 停止 MM2 Connector
4. 切换 Client DNS / Bootstrap Server 到 Standby
5. 启动 Standby 侧 Producer
6. Consumer 从 Standby 集群继续消费

RPO / RTO 目标

方案RPORTO适用场景
MM2 异步复制< 30s< 5min跨 Region 灾备
CRD GitOps 备份< 5min< 30min配置灾难恢复
Velero 备份< 1h< 2h数据完全恢复
Tiered Storage< 1min< 10min对象存储层面的数据恢复
22 Kafka 的 KRaft(ZooKeeper-less)架构

答案:

KRaft(Kafka Raft)是 Kafka 从 2.8 版本引入的元数据管理协议,用 Raft 一致性算法替代 ZooKeeper,Kafka 3.5 版本起标记为生产就绪。

ZooKeeper 模式 vs KRaft 模式

维度ZooKeeper 模式KRaft 模式
元数据存储ZooKeeper 集群KRaft Quorum(Kafka 内部)
外部依赖需独立部署和运维 ZK 集群无外部依赖
Controller 角色Controller + Broker 同节点可独立 Quorum Controller 节点
元数据同步ZK Watch + ZK 写入Raft Log Replication
分区数上限约 200K(受 ZK 限制)数百万分区
故障恢复速度分钟级秒级
部署复杂度需管理两个集群(Kafka + ZK)单一集群管理

Strimzi KRaft 模式配置

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-kraft-cluster
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 3.7.0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
    storage:
      type: persistence-claim
      size: 200Gi

KRaft NodePool 角色分离

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: controller
  labels:
    strimzi.io/cluster: my-kraft-cluster
spec:
  replicas: 3
  roles:
    - controller
  storage:
    type: persistent-claim
    size: 20Gi
  resources:
    requests:
      memory: 2Gi
      cpu: 1

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: broker
  labels:
    strimzi.io/cluster: my-kraft-cluster
spec:
  replicas: 3
  roles:
    - broker
  storage:
    type: persistent-claim
    size: 200Gi
  resources:
    requests:
      memory: 8Gi
      cpu: 2

KRaft Quorum 选举

Controller 节点(3 节点 Quorum):  majority = floor(3/2) + 1 = 2
  任意 1 个 Controller 故障 → Quorum 仍可用
  2 个 Controller 故障 → Quorum 不可用 → 元数据不可写

Controller 角色可选组合:
  - 纯 Controller 节点(推荐): roles: [controller]
  - 纯 Broker 节点: roles: [broker]
  - 混合节点: roles: [controller, broker]

KRaft 迁移路径(ZK → KRaft):

Kafka 3.5+ → 启用 ZK 到 KRaft 的迁移工具 → 
  Step 1: 部署 KRaft Controller Quorum → 
  Step 2: 配置 Broker 同时连接 ZK 和 KRaft → 
  Step 3: 元数据从 ZK 迁移至 KRaft → 
  Step 4: 验证 KRaft 中元数据完整性 → 
  Step 5: 从 Broker 配置中移除 ZK 连接 → 
  Step 6: 下线 ZooKeeper 集群

KRaft 生产就绪注意事项

  • Controller 节点磁盘存储元数据日志(__cluster_metadata),建议使用高性能 SSD。
  • Controller Quorum 节点数应为奇数(3 或 5),不允许扩展/收缩操作。
  • 使用 strimzi.io/kraft: enabled 注解时必须同时启用 strimzi.io/node-pools: enabled
  • Strimzi 0.39+ KRaft 模式仍标记为 GA 预览(详见 Strimzi 版本发布说明)。
23 Kafka 的在线分区重分配

答案:

在线分区重分配是在不中断服务的情况下,将分区副本从某些 Broker 迁移到其他 Broker 的操作。

触发场景

场景说明
集群扩容新增 Broker 后需要重新平衡分区
集群缩容Broker 下线前迁移其分区
热点消除将热点分区从高负载 Broker 迁移
磁盘使用不均均衡各 Broker 的磁盘使用率
Rack Awareness修正不符合 Rack 分布的副本

Strimzi 通过 Cruise Control 执行重分配

# 方式一:KafkaRebalance CRD
kubectl apply -f - <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  labels:
    strimzi.io/cluster: my-cluster
spec:
  goals:
    - RackAwareGoal
    - ReplicaCapacityGoal
    - DiskCapacityGoal
EOF

# 查看方案
kubectl describe kafkarebalance my-rebalance

# 批准执行
kubectl annotate kafkarebalance my-rebalance strimzi.io/rebalance=approve

手动分区重分配(kafka-reassign-partitions.sh)

# 1. 生成 Topic 列表(JSON 格式)
cat > topics-to-move.json <<EOF
{
  "topics": [
    {"topic": "orders"},
    {"topic": "payments"}
  ],
  "version": 1
}
EOF

# 2. 生成重分配方案
kubectl exec my-cluster-kafka-0 -- bin/kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --topics-to-move-json-file /tmp/topics-to-move.json \
  --broker-list "0,1,2,3" \
  --generate

# 3. 执行重分配
kubectl exec my-cluster-kafka-0 -- bin/kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/reassignment.json \
  --execute

# 4. 验证进度
kubectl exec my-cluster-kafka-0 -- bin/kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/reassignment.json \
  --verify

重分配的限流与影响控制

# Kafka CR 中配置限流
spec:
  kafka:
    config:
      # 副本间数据搬迁的限流(B/s),防止影响正常流量
      leader.replication.throttled.rate: 104857600    # 100MB/s
      follower.replication.throttled.rate: 104857600  # 100MB/s

重分配执行机制

1. Controller 将分区标记为 Reassigning
2. 目标 Broker 启动 ReplicaFetcher 线程
3. 从 Leader 拉取分区数据到目标 Broker
4. 目标 Broker 数据追平 ISR 后,加入 ISR
5. Controller 从原 Broker 移除旧副本
6. 更新分区元数据
7. 分区回到正常状态
  • 整个过程中生产者和消费者不中断。
  • 重分配期间集群带宽消耗增加,需设置限流避免影响生产流量。
  • 大规模重分配通过 Cruise Control 自动管理更可靠。
24 Kafka 的 Connect 框架与 Connector 管理

答案:

Kafka Connect 是 Kafka 的数据集成框架,用于将外部系统数据导入 Kafka(Source Connector)或从 Kafka 导出到外部系统(Sink Connector)。

KafkaConnect CRD

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.7.0
  replicas: 2
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
  build:
    output:
      type: docker
      image: registry.example.com/kafka-connect:latest
    plugins:
      - name: debezium-mysql
        artifacts:
          - type: zip
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.6.0.Final/debezium-connector-mysql-2.6.0.Final-plugin.zip
            sha512sum: <sha512-checksum>

KafkaConnector CRD

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mysql-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 2
  config:
    database.hostname: mysql.example.com
    database.port: 3306
    database.user: debezium
    database.password: ${secret:mysql-secret:password}
    database.server.id: 1
    topic.prefix: cdc
    database.include.list: orders,payments
    table.include.list: orders.orders,payments.transactions
    snapshot.mode: initial
    include.schema.changes: true
    transforms: Reroute,ExtractField
    transforms.Reroute.type: io.debezium.transforms.ByLogicalTableRouter
    transforms.Reroute.topic.regex: cdc.orders.orders
    transforms.Reroute.topic.replacement: orders-events

Connect Worker 模式

模式配置适用场景
Distributedreplicas > 1生产环境,高可用
Standalonereplicas = 1开发测试,非关键

Connect 内部 Topic

Topic用途
config.storage.topic存储 Connector 配置
offset.storage.topic存储 Connector Offset
status.storage.topic存储 Connector 和 Task 状态

Connector 运行机制

Connect Worker 启动 → 
  读取 config.storage.topic 中的 Connector 配置 → 
  Worker 分配 Connector → Tasks 数量 → 
  每个 Task 独立运行(并行度 = tasksMax)→ 
  Source Task: poll() → 写入 Kafka → commit() offset
  Sink Task: 消费 Kafka → put() 到外部系统 → flush() → commit() offset

Connector 失败处理

配置行为
errors.tolerance=none任务失败立即停止(默认)
errors.tolerance=all忽略所有错误,跳过错误记录
errors.deadletterqueue.topic.name错误记录写入死信队列
errors.deadletterqueue.context.headers.enable=true死信消息附带上下文 Headers
25 Kafka 的 Schema Registry 集成

答案:

Schema Registry 提供 Avro / Protobuf / JSON Schema 的集中式模式管理,确保 Producer 和 Consumer 之间的数据兼容性。

Strimzi Schema Registry 部署

# Schema Registry 以独立 Deployment 部署
apiVersion: apps/v1
kind: Deployment
metadata:
  name: schema-registry
  labels:
    app: schema-registry
spec:
  replicas: 2
  selector:
    matchLabels:
      app: schema-registry
  template:
    metadata:
      labels:
        app: schema-registry
    spec:
      containers:
        - name: schema-registry
          image: confluentinc/cp-schema-registry:7.6.0
          env:
            - name: SCHEMA_REGISTRY_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.podIP
            - name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
              value: PLAINTEXT://my-cluster-kafka-bootstrap:9092
            - name: SCHEMA_REGISTRY_KAFKASTORE_TOPIC
              value: _schemas
            - name: SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR
              value: "3"
            - name: SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL
              value: SSL
            - name: SCHEMA_REGISTRY_LISTENERS
              value: http://0.0.0.0:8081
            - name: SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL
              value: backward
            - name: SCHEMA_REGISTRY_ACCESS_CONTROL_ENABLED
              value: "true"
          ports:
            - containerPort: 8081
          resources:
            requests:
              memory: 1Gi
              cpu: 500m
            limits:
              memory: 2Gi
              cpu: 1

模式兼容性策略

策略含义允许变更
BACKWARD新 Schema 读取旧数据不失败添加可选字段、删除字段
FORWARD旧 Schema 读取新数据不失败添加可选字段
FULL向前向后都兼容仅添加可选字段
BACKWARD_TRANSITIVESchema 对所有历史版本向后兼容从初始版本起仅添加可选字段
FORWARD_TRANSITIVESchema 对所有历史版本向前兼容从初始版本起仅删除字段
FULL_TRANSITIVE所有版本间全兼容最严格约束
NONE不检查兼容性任何修改

客户端集成

# Producer 配置
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url=http://schema-registry.kafka.svc:8081
auto.register.schemas=true

# Consumer 配置
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url=http://schema-registry.kafka.svc:8081
specific.avro.reader=true

生产关注事项

  • _schemas Topic 必须设置高副本因子(>= 3)和 cleanup.policy=compact
  • Schema Registry 自身无状态,可水平扩展,但 _schemas Topic Leader 在同一时刻只有一个 Writer。
  • 避免频繁注册新 Schema,每个新 Schema 写入 _schemas Topic 成本较高。
  • Schema Registry 故障不影响 Kafka Broker 核心链路,但 Producer / Consumer 序列化 / 反序列化会中断。
26 Kafka 的 Quota 限流机制

答案:

Kafka 的 Quota 机制在客户端(Client ID / User)粒度上对网络带宽和请求速率进行限流,防止单个客户端耗尽 Broker 资源。

Quota 类型

Quota 类型配置参数粒度说明
网络带宽producer_byte_rate / consumer_byte_rateClient ID / User限制生产/消费的字节速率(B/s)
请求速率request_percentageClient ID / User限制占用 I/O 线程和网络线程的时间百分比
Controller 变更controller_mutation_rateClient ID / User限制客户端创建 Topic、分区、删除 Topic 的速率
CPUcpu_utilizationClient ID / User限制连接占用的 CPU 时间百分比

Strimzi 中通过 KafkaUser 设置 Quota

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: my-user
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
  quotas:
    producerByteRate: 10485760    # 10MB/s 生产限流
    consumerByteRate: 20971520    # 20MB/s 消费限流
    requestPercentage: 50         # 50% I/O 线程占用

Quota 生效优先级

<user, client-id> Quota → 最高优先级
<user> Quota → 次优先级
<client-id> Quota → 再次优先级
<user, client-id> default → 默认 Quota

动态设置 Quota(kafka-configs.sh)

# 设置 User 级别生产带宽限流
kubectl exec my-cluster-kafka-0 -- bin/kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type users --entity-name my-user \
  --alter --add-config producer_byte_rate=10485760

# 设置 Client ID 级别请求率限流
kubectl exec my-cluster-kafka-0 -- bin/kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type clients --entity-name app-producer \
  --alter --add-config request_percentage=30

# 查看当前 Quota 配置
kubectl exec my-cluster-kafka-0 -- bin/kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type users --entity-name my-user --describe

限流时的行为

  • 超出 producer_byte_rate / consumer_byte_rate 的网络数据传输被延迟,连接不会断开。
  • 超出 request_percentage 的请求被延迟处理,客户端感知为请求延迟增加。
  • 延迟机制由 Broker 端的 quota.window.num(采样窗口数)和 quota.window.size.seconds(窗口大小)控制。

Quota 监控

指标说明
`kafka.server:type={ProduceFetch},name=throttle-time`
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec入站字节速率
kafka.server:type=ClientQuotaManager,name=ThrottleTime总限流延迟统计
27 Kafka 的热点分区检测与处理

答案:

热点分区指流量、数据量或延迟显著高于同 Topic 其他分区的分区,会导致 Broker 负载不均、Consumer 消费倾斜。

热点分区的成因

成因说明
消息 Key 分布不均特定 Key 的消息量远大于其他 Key
分区分配倾斜流量高分区集中在少数 Consumer
Broker 硬件差异Leader 分布在性能不同的 Broker 上
同一 Broker 承载过多 LeaderController 分区分配策略导致 Leader 集中

检测方法

# 按 Topic 分组查看各分区的写入字节速率
kubectl exec my-cluster-kafka-0 -- bin/kafka-run-class.sh \
  kafka.tools.JmxTool \
  --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=orders \
  --reporting-interval 1000

# 通过 kafka-log-dirs.sh 查看各分区磁盘占用
kubectl exec my-cluster-kafka-0 -- bin/kafka-log-dirs.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --topic-list orders

# 查看 Topic 分区 Leader 分布
kubectl exec my-cluster-kafka-0 -- bin/kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --describe --topic orders

Prometheus 热点分区查询

# 单分区写入速率(检测热点)
topk(10, sum(rate(kafka_server_broker_topic_metrics_bytesinpersec[5m])) by (topic, partition))

# 分区大小 Top 10
topk(10, sum(kafka_log_log_size) by (topic, partition))

# 单 Broker 上 Leader 分区数
count(kafka_server_replicamanager_leadercount) by (pod)

处理策略

策略方法效果
修改分区 Key增加 Key 粒度或散列化 Key从源头均衡数据分布
增加分区数提高 Topic 分区数分散数据到更多分区
Leader 均衡触发 Preferred Leader Election均匀分布 Leader
分区重分配Cruise Control / kafka-reassign-partitions迁移分区到低负载 Broker
Consumer 调整调整 max.poll.records、增加 Consumer 实例消除消费倾斜

Strimzi 自动 Leader 均衡

spec:
  kafka:
    config:
      auto.leader.rebalance.enable: true
      leader.imbalance.per.broker.percentage: 10     # 不平衡阈值 10%
      leader.imbalance.check.interval.seconds: 300    # 每 5 分钟检查

Consumer 端处理热点

// 自定义 Partitioner:热点 Key 拆分为多个子 Key
public class ConsistentHashPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        String keyStr = (String) key;
        if (isHotKey(keyStr)) {
            // 热点 Key 附加随机后缀散列
            keyStr = keyStr + "-" + ThreadLocalRandom.current().nextInt(10);
        }
        return Math.abs(keyStr.hashCode()) % cluster.partitionCountForTopic(topic);
    }
}
28 Kafka 的生产者幂等性(Idempotent Producer)

答案:

生产者幂等性确保在网络重试等异常场景下,同一条消息不会在 Broker 中被重复写入,是 Exactly-Once 语义的基础。

幂等性实现原理

Producer 初始化 → 向 Broker 发送 InitProducerIdRequest → 
  Broker 分配 Producer ID(PID,单调递增的唯一 ID)→ 
  Producer 为每个 <PID, TopicPartition> 维护 Sequence Number(从 0 开始)→ 
  发送消息时附加 <PID, Epoch, SequenceNumber> → 
  Broker 端维护每个 <PID, TopicPartition> 已确认的最大 SequenceNumber → 
  收到消息:
    1. Sequence == lastSeq+1 → 接受,更新 lastSeq
    2. Sequence <= lastSeq → 重复消息,丢弃,返回 ACK(不重复写入)
    3. Sequence > lastSeq+1 → 乱序,抛出 OutOfOrderSequenceException

幂等性配置

enable.idempotence=true                    # 启用幂等
acks=all                                    # 必须设为 all
max.in.flight.requests.per.connection=5     # ≤ 5
retries=2147483647                          # 无限重试

Broker 端幂等性相关存储

Producer ID 分配:
  - ZK 模式:存储在 /latest_producer_id_block ZNode 中
  - KRaft 模式:存储在 Controller 元数据日志中

Sequence Number 维护:
  - 每个 Broker 的内存中维护 Snapshot 文件
  - 保存最近 N 个 Producer 的 <PID, Epoch, SequenceNumber> 状态
  - Snapshot 定期写入磁盘,Broker 启动时恢复

幂等性的局限

场景幂等性是否有效说明
Producer 内部重试有效相同 PID + SequenceNumber 被去重
Producer 重启有效(PID 不变)但 PID 可能因 Broker 再分配而改变
Producer 实例更换需配合事务新实例分配新 PID,无法跨 PID 去重
跨 Session 消息需配合事务不同 Producer Session 无法保证去重

与事务的配合

幂等生产者 → 单分区 / 单 Session 内的去重保证
事务生产者 → 
  跨分区原子写入 +
  Consumer-Transform-Produce 循环中的 Exactly-Once +
  跨 Session 去重(基于 TransactionalId 而非 PID)

幂等性丢失服务保护

  • 设置 transactional.id 将去重标识从 PID 切换到 TransactionalId。
  • 配合 transaction.timeout.ms 控制事务超时后的资源回收。
  • max.in.flight.requests.per.connection=5 与幂等性兼容(Kafka 1.0+),不必设为 1。
29 Kafka 常见故障排查

答案:

Kafka on Kubernetes 常见故障场景与排查方法。

场景一:Broker Pod 无法启动

症状:Pod 状态 CrashLoopBackOff / Pending

排查步骤:
  1. kubectl describe pod my-cluster-kafka-0
  2. kubectl logs my-cluster-kafka-0 -c kafka
  3. 常见原因:
     - PVC 无法挂载(StorageClass 不兼容 / PV 未创建)
     - 内存不足(JVM OOMKilled)
     - ZooKeeper 连接失败(ZK Pod 未就绪 / DNS 不可解析)
     - 证书过期(TLS 证书校验失败)
     - config 中的 broker.id 冲突

  4. 解决:
     - kubectl describe pvc data-my-cluster-kafka-0
     - 检查 JVM heap 配置:-Xms 与 -Xmx 与容器 memory limit 匹配
     - 验证 ZK 服务:kubectl exec my-cluster-zk-0 -- bin/zookeeper-shell.sh localhost:2181 stat

场景二:Under Replicated Partitions 持续 > 0

症状:kafka_server_replica_manager_underreplicated_partitions > 0 持续超过 5 分钟

排查步骤:
  1. kubectl exec my-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
  2. 检查受影响 Broker 的状态
  3. 常见原因:
     - 某一 Broker 宕机或 OOM 重启
     - 网络分区导致 Broker 间通信中断
     - 磁盘 I/O 瓶颈导致 Follower 追不上 Leader
     - Replica Fetcher 线程不足

  4. 解决:
     - 恢复故障 Broker
     - 调大 num.replica.fetchers(默认 1 → 4)
     - 检查磁盘 I/O 指标,必要时迁移分区

场景三:Consumer Lag 持续增长

症状:Consumer Group Lag 单调递增

排查步骤:
  1. kubectl exec my-cluster-kafka-0 -- bin/kafka-consumer-groups.sh \
       --bootstrap-server localhost:9092 --group order-processor --describe
  2. 对比 Produce 速率与 Consumer 消费速率
  3. 常见原因:
     - Consumer 实例数量不足
     - Consumer 处理逻辑慢(外部 API 调用 / DB 查询慢)
     - Consumer 频繁 Rebalance 导致消费暂停
     - 某些分区 Lag 高但其他分区 Lag 低(热点分区)

  4. 解决:
     - 增加 Consumer 实例(不超过分区数)
     - 优化 Consumer 处理逻辑
     - 启用 Cooperative Rebalance 减少 Rebalance 暂停时间
     - 增加 max.poll.records 减少 Poll 频率
     - 热点分区通过修改 Key 策略处理

场景四:Producer 请求超时

症状:Producer 端报 TimeoutException / request timed out

排查步骤:
  1. 检查 Broker 端 produce 请求延迟
     kafka_network_request_metrics_totaltimems_produce
  2. 检查 Broker GC 暂停时间
     jvm_gc_pause_seconds
  3. 检查磁盘 I/O Util
  4. 常见原因:
     - acks=all 且 ISR 中有 Follower 延迟过高
     - Broker 磁盘 I/O 饱和
     - 网络带宽打满
     - JVM GC 频繁导致 Broker 无响应

  5. 解决:
     - Producer acks 从 all 降为 1(非关键业务)
     - 调整 JVM GC 参数减少暂停
     - 扩容 Broker 或增加网络带宽

场景五:KafkaTopic CR 状态 NotReady

症状:KafkaTopic CR status.conditions 中出现 NotReady

排查步骤:
  1. kubectl describe kafkatopic my-topic
  2. kubectl logs deploy/my-cluster-entity-operator -c topic-operator
  3. 常见原因:
     - Topic Operator 无法连接 Kafka Admin API
     - 分区数或副本因子超过集群容量
     - Topic 名称不符合规范
     - Kafka Broker 端 ACL 限制 Entity Operator 操作

  4. 解决:
     - 验证 Entity Operator 的 TLS 证书未过期
     - 检查 KafkaTopic.spec 参数合法性
     - 确认 Entity Operator 在 superUsers 列表中
30 Kafka on Kubernetes 生产环境最佳实践

答案:

Kafka on Kubernetes 生产部署需覆盖集群规划、资源配置、安全加固、监控告警和运维自动化五个维度。

集群规划

维度建议
K8s 版本>= 1.27(支持 Strimzi 0.39+),优先选择长期支持版本
节点隔离Kafka Broker 专用节点组(NodeSelector / Taint + Toleration)
可用区分布Broker 跨 3 个 AZ 分布(Rack Awareness 配置)
网络 CNICilium / Calico,支持 NetworkPolicy
存储独立的 StorageClass(NVMe SSD 优先),禁用 PVC 自动删除
集群规模单集群分区数 < 200K,单个 Broker 分区数 < 6K

资源配置

spec:
  kafka:
    replicas: 3
    resources:
      requests:
        memory: 8Gi
        cpu: 2
      limits:
        memory: 16Gi
        cpu: 4
    jvmOptions:
      -Xms: 6144m
      -Xmx: 6144m         # heap = limit * 0.5(剩余给 Page Cache)
      -XX:+UseG1GC
      -XX:MaxGCPauseMillis: 20
      -XX:G1HeapRegionSize: 32m
      -XX:MetaspaceSize: 128m
      -XX:MaxMetaspaceSize: 256m
      -XX:+ExitOnOutOfMemoryError
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 500Gi
          class: nvme-ssd
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: strimzi.io/name
                      operator: In
                      values:
                        - my-cluster-kafka
                topologyKey: kubernetes.io/hostname
        tolerations:
          - key: dedicated
            operator: Equal
            value: kafka
            effect: NoSchedule
      serviceAccount:
        metadata:
          annotations:
            iam.gke.io/gcp-service-account: [email protected]

安全加固清单

安全维度措施
网络TLS 加密所有 Listener,NetworkPolicy 仅允许必要端口入站
认证SASL SCRAM-SHA-512 或 mTLS,禁用 Plain Listener
授权KafkaUser ACL 最小权限,Entity Operator 配置 superUsers
存储PVC 加密(StorageClass 开启 Encryption),Secret 使用 External Secrets Operator
审计启用 Kafka Authorizer 日志,汇聚至集中日志平台
镜像使用官方 Strimzi 镜像,定期扫描 CVE 并更新

运维自动化

# Cruise Control 异常检测
spec:
  cruiseControl:
    config:
      anomaly.detection.goals: >-
        com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,
        com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal        
      failed.brokers.zk.session.timeout.ms: 15000
      metric.anomaly.finder.class: >-
        com.linkedin.kafka.cruisecontrol.detector.NoopMetricAnomalyFinder        
      goal.violation.detection.interval.ms: 300000
运维场景自动化方案
滚动升级Strimzi Operator 自动滚动升级,可配置 inter.broker.protocol.version 逐步升级
集群扩缩容修改 kafka.replicas 后 Operator 自动处理,配合 Cruise Control Rebalance
证书轮换Strimzi CA 自动轮换,证书过期前 30 天自动更新
分区均衡KafkaRebalance CRD 或 Cruise Control Anomaly Detector 定时均衡
备份恢复Velero 定时备份,MM2 跨集群灾备
容量规划Prometheus 指标 + 线性回归预测磁盘/分区增长趋势

关键告警规则

groups:
  - name: kafka-critical
    rules:
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_replica_manager_underreplicatedpartitions > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Kafka Under Replicated Partitions = XQOPEN $value XQCLOSE"

      - alert: KafkaUnderMinISR
        expr: kafka_server_replica_manager_underminisrpartitioncount > 0
        for: 2m
        labels:
          severity: critical

      - alert: KafkaBrokerDown
        expr: kafka_server_replicamanager_leadercount < 1
        for: 2m
        labels:
          severity: critical

      - alert: KafkaActiveControllerCount
        expr: sum(kafka_controller_activecontroller) != 1
        for: 1m
        labels:
          severity: critical

      - alert: KafkaHighConsumerLag
        expr: kafka_consumergroup_group_lag > 100000
        for: 10m
        labels:
          severity: warning

      - alert: KafkaDiskUsageHigh
        expr: (kafka_log_log_size / kafka_log_log_size_limit) > 0.85
        for: 5m
        labels:
          severity: warning

版本升级策略

Strimzi 版本升级流程:
  1. 阅读 Strimzi 版本升级说明(Upgrade Guide)
  2. 备份 Kafka CRD 和配置
  3. 升级 Strimzi Cluster Operator(Helm / OLM)
  4. 升级 CRD 定义
  5. 升级 Kafka CR 中的 version 字段(Kafka 版本)
  6. Operator 自动触发 Broker 滚动升级
  7. 验证集群状态和客户端兼容性

跨大版本升级原则:
  - 不跳版本升级(如 3.5 → 3.6 → 3.7)
  - 先升级 inter.broker.protocol.version,再升级 log.message.format.version
  - 客户端版本与 Broker 版本的兼容性区间为 ±1 大版本