Kafka on Kubernetes 面试题
30 道题- 分类
- 中间件
- 题目数
- 30 道
1 Kafka on Kubernetes 的部署方案对比(Strimzi / Confluent Operator / Banzaicloud Koperator)
答案:
Kubernetes 上部署 Kafka 的主流方案包括 Strimzi、Confluent for Kubernetes(CFK)和 Banzaicloud Koperator,三者均通过 Operator 模式实现声明式管理与自动化运维。
| 维度 | Strimzi | Confluent for Kubernetes | Banzaicloud Koperator |
|---|---|---|---|
| 开源协议 | Apache 2.0(CNCF Sandbox) | Confluent Community License | Apache 2.0 |
| Kafka 发行版 | Apache Kafka | Confluent Platform(含企业特性) | Apache Kafka |
| ZooKeeper 管理 | Operator 一并管理 ZK 集群 | Operator 管理 ZK | 需用户自行管理 |
| KRaft 支持 | Strimzi 0.34+ 支持 KRaft 模式 | 完整支持 | 不支持 |
| Topic 管理 | Topic Operator(KafkaTopic CRD) | 原生集成 | 通过 KafkaTopic CRD |
| 用户管理 | User Operator(KafkaUser CRD) | Confluent RBAC | 需手动管理 |
| MirrorMaker 2 | KafkaMirrorMaker2 CRD | Confluent Replicator | 不支持 |
| Connect 管理 | KafkaConnect CRD | Confluent Control Center 集成 | 不支持 |
| Cruise Control | 内置集成 | 内置集成 | 不支持 |
| Schema Registry | 需额外部署 | 原生集成 | 不支持 |
| 监控 | JMX Exporter + Prometheus annotations | Confluent Control Center | 需自行集成 |
| 网络 | TLS/SCRAM-SHA-512/OAuth2/mTLS | mTLS/SASL/SSO | TLS/SASL |
| 存储 | PVC/JBOD/Tiered Storage | PVC | PVC |
| 升级策略 | 支持滚动升级/手动控制 | 支持滚动升级 | 有限支持 |
| 适用场景 | 社区版 Kafka,CNCF 生态,中小规模 | 企业合规,Confluent 生态,大规模 | 简单部署场景 |
Strimzi 优势:CNCF 生态兼容性最佳,CRD 覆盖全生命周期,社区活跃度高。Confluent Operator 优势:企业级功能(RBAC、Control Center、Schema Registry、Tiered Storage)开箱即用,商业支持完善。Banzaicloud Koperator 优势:轻量,部署简单。
2 Strimzi Operator 的架构与核心 CRD
答案:
Strimzi Operator 采用分层架构,通过一组 Kubernetes CRD 描述 Kafka 集群、Topic、用户和数据集成组件,Operator 负责将 CRD 描述转换为实际的 StatefulSet、Deployment、Service 等 Kubernetes 原生资源。
核心 CRD 列表:
| CRD | 职责 | 对应 K8s 资源 |
|---|---|---|
Kafka | 定义 Kafka 集群拓扑,含 Broker、ZK/KRaft、存储、网络、认证配置 | StatefulSet、Service、ConfigMap、PVC |
KafkaTopic | 声明式管理 Topic 的分区数、副本因子、配置 | 无(通过 Kafka Admin API 创建) |
KafkaUser | 定义客户端用户及 ACL 权限 | Secret(存储凭证) |
KafkaConnect | 部署和管理 Kafka Connect 集群 | Deployment/StatefulSet、Service |
KafkaConnector | 管理单个 Connector 实例 | 无(通过 Connect REST API) |
KafkaMirrorMaker2 | 部署 Mirrormaker 2 跨集群复制 | Deployment、Service |
KafkaBridge | 部署 HTTP Bridge,支持 HTTP 协议访问 Kafka | Deployment、Service |
KafkaRebalance | 触发 Cruise Control 执行分区均衡 | 无(通过 Cruise Control API) |
KafkaNodePool | 定义 Broker 节点池(支持异构 Broker) | StatefulSet |
架构工作流:
Kafka CR 创建 → Cluster Operator Watch →
创建 ZK StatefulSet(非 KRaft 模式)→
创建 Broker StatefulSet →
Entity Operator(Topic Operator + User Operator)启动 →
监听 KafkaTopic / KafkaUser CRD →
通过 Admin API 操作 Kafka 集群
Strimzi Operator 控制平面部署在 kafka 命名空间,支持单命名空间或集群级别监听。Cluster Operator 通过 STRIMZI_NAMESPACE 环境变量控制监听范围。
3 Strimzi 的 Kafka 集群 StatefulSet 部署
答案:
Strimzi 将每个 Kafka Broker 和 ZooKeeper 节点映射为 StatefulSet 中的 Pod,利用 StatefulSet 的有序部署、稳定网络标识和持久化存储实现有状态服务的 Kubernetes 化管理。
StatefulSet 关键配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
version: 3.7.0
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: nodeport
tls: true
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
- id: 1
type: persistent-claim
size: 100Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: strimzi.io/name
operator: In
values:
- my-cluster-kafka
topologyKey: kubernetes.io/hostname
resources:
requests:
memory: 8Gi
cpu: 2
limits:
memory: 16Gi
cpu: 4
StatefulSet 网络标识:每个 Broker 分配稳定的 DNS 名称 my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc.cluster.local,Broker ID 与 Pod 序号对应。跨集群重启后 Broker ID 保持不变。
PodAntiAffinity 与 Rack Awareness:通过 podAntiAffinity 将不同 Broker Pod 调度到不同 K8s 节点,配合 rack 配置实现机架感知副本分布,确保分区副本跨物理故障域分布。
JBOD 存储:Strimzi 支持 JBOD(Just a Bunch of Disks)配置,每个 Broker 可挂载多个独立 PVC,Kafka 日志目录分布在不同卷上,提升 I/O 并行度。
4 Strimzi 的 Entity Operator(Topic Operator + User Operator)
答案:
Entity Operator 是 Strimzi 中负责 Topic 和 User 生命周期管理的子组件,以 Sidecar 或独立 Deployment 形式运行,通过 Kafka Admin API 与 Kafka 集群交互。
Entity Operator 架构:
| 子组件 | CRD | 职责 | 通信方式 |
|---|---|---|---|
| Topic Operator | KafkaTopic | 管理 Topic 创建、分区数、副本因子、配置更新 | Kafka Admin API |
| User Operator | KafkaUser | 管理用户凭证生成、ACL 绑定、配额设置 | Kafka Admin API |
| TLS Sidecar | — | 管理 TLS 证书轮换与推送 | 文件系统共享卷 |
Topic Operator 工作流程:
KafkaTopic CR 创建/更新 → Topic Operator Watch 事件 →
反序列化 CR → 校验分区数/副本因子/配置 →
调用 AdminClient.createTopics() / AdminClient.describeConfigs() →
对比期望状态与当前状态 → 执行变更 → 更新 KafkaTopic.status
User Operator 工作流程:
KafkaUser CR 创建/更新 → User Operator Watch 事件 →
生成凭证(SCRAM-SHA-512 密码 / TLS 证书)→
存储到 Secret → 调用 Admin API 创建/更新用户 →
绑定 ACL 规则 → 设置 Quota → 更新 KafkaUser.status
Topic Operator 双向同步策略:
| 策略 | 行为 |
|---|---|
UNIDIRECTIONAL(单向) | 仅从 KafkaTopic CR → Kafka 集群同步,集群内手工变更不被 CR 感知 |
BIDIRECTIONAL(双向) | KafkaTopic CR ↔ Kafka 集群双向同步,集群内手工变更会更新 CR 状态 |
常见故障处理:Topic Operator 与 Kafka 集群网络中断时,CR 状态进入 NotReady,连接恢复后自动重建 ZooWatcher 并重新同步。证书过期时 TLS Sidecar 自动触发证书轮换。
5 Kafka Broker 的 Rack Awareness 在 K8s 上的实现
答案:
Rack Awareness 确保 Kafka 分区的不同副本分布在不同的故障域(K8s 节点 / 可用区)上,避免单个节点或 AZ 故障导致数据不可用。
K8s 上实现 Rack Awareness 的三种方式:
| 方式 | 配置方法 | 适用场景 |
|---|---|---|
| Kubernetes Node Label | rack.topologyKey: topology.kubernetes.io/zone | 多 AZ 集群 |
| 手动指定 Rack ID | template.pod.topologySpreadConstraints | 精细化控制 |
| Pod Anti-Affinity | affinity.podAntiAffinity | 确保不同 Broker 在不同 Node |
Strimzi Rack Awareness 配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
replicas: 3
rack:
topologyKey: topology.kubernetes.io/zone
listeners:
- name: plain
port: 9092
type: internal
template:
pod:
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: strimzi.io/name
operator: In
values:
- my-cluster-kafka
topologyKey: kubernetes.io/hostname
工作原理:Strimzi 读取每个 Broker Pod 所在 K8s 节点的 topology.kubernetes.io/zone Label,将其注入 Kafka 的 broker.rack 配置。Kafka Controller 在分区分配时确保同一分区的不同副本落在不同 AZ 的 Broker 上。
Pod Topology Spread Constraints 实现细粒度分布:
template:
pod:
topologySpreadConstraints:
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
strimzi.io/name: my-cluster-kafka
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
strimzi.io/name: my-cluster-kafka
Rack Awareness 与分区副本分布规则:
- 分区副本分布在
min(rackCount, replicationFactor)个机架上。 - ISO(In-Sync Replica)集合优先跨 Rack 分布。
- Rack 级故障时,剩余 Rack 上的 Follower 副本自动选举为新 Leader。
6 Kafka Topic 的分区与副本配置
答案:
Kafka Topic 的分区和副本配置直接决定数据分布、并行消费能力和容错能力。
KafkaTopic CRD 示例:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: orders-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 12
replicas: 3
config:
retention.ms: 604800000 # 7 天保留
retention.bytes: 107374182400 # 100GB 保留
segment.bytes: 1073741824 # 1GB 每段
min.insync.replicas: 2
cleanup.policy: delete
compression.type: producer
max.message.bytes: 10485760 # 10MB
message.timestamp.type: CreateTime
分区数设计原则:
| 场景 | 建议分区数 | 依据 |
|---|---|---|
| 低吞吐(< 1 MB/s) | 3-6 | 最小并行度 |
| 中吞吐(1-50 MB/s) | 6-12 | 平衡并行度与开销 |
| 高吞吐(50-200 MB/s) | 12-30 | 充分利用并行消费 |
| 超高吞吐(> 200 MB/s) | 30-100 | 线性扩展 |
分区数约束:分区数不可减少(Kafka 自身限制),生产环境建议预留 20%-30% 增长空间。过多分区增加文件句柄开销和 Controller 选举耗时。
副本因子配置:
| 副本因子 | 容错能力 | 适用场景 |
|---|---|---|
| 1 | 0 节点故障 | 开发环境 |
| 2 | 1 节点故障 | 非关键业务 |
| 3 | 2 节点故障(配合 min.insync.replicas=2) | 生产标准 |
| 4-5 | 3+ 节点故障 | 金融级高可用 |
存储容量估算公式:
单分区日均存储 = 日均消息数 × 平均消息大小 × 副本因子 × (1 + segment索引开销≈10%) × 保留天数
总存储 = 单分区存储 × 分区数 + 预留 30% buffer
7 Kafka 的 ISR(In-Sync Replica)机制与最小 ISR
答案:
ISR 是 Kafka 高可用和数据一致性的核心机制,定义了与 Leader 保持同步的 Follower 副本集合。
ISR 判断条件:
Follower 进入 ISR 条件:
- Follower 在 replica.lag.time.max.ms(默认 30s)内向 Leader 发送过 Fetch 请求
- Follower 的 LEO(Log End Offset)与 Leader 的差异在 replica.lag.max.messages 范围内
Follower 踢出 ISR 条件:
- 超过 replica.lag.time.max.ms 未向 Leader 发起 Fetch 请求
- Follower LEO 落后 Leader LEO 超过阈值
ISR 收缩与扩展流程:
Leader 维护 ISR 集合 →
Follower Fetch 请求到达 → 更新 lastCaughtUpTimeMs →
定期检查:now - lastCaughtUpTimeMs > replica.lag.time.max.ms →
从 ISR 移除 → 写入 ZooKeeper / KRaft →
通知 Controller 更新 ISR 元数据 → Propagate 至所有 Broker →
Follower 追上后重新加入 ISR
min.insync.replicas 配置:
| minISR | 容错含义 | 风险 |
|---|---|---|
| 1 | Leader 单独在 ISR 即可写入 | 数据丢失,Leader 故障后数据不可恢复 |
| 2(推荐) | 至少 Leader + 1 个 Follower 在 ISR 才能写入 | 可容忍 1 个 ISR 节点故障 |
| replicas = 3, minISR = 3 | 所有副本均需在 ISR | 任一节点故障中断写入 |
写入一致性保障:
Producer acks=all + min.insync.replicas=2 + replicationFactor=3
→ 消息写入 Leader → Leader 等待 Follower 确认 →
至少 2 个副本(含 Leader)持久化成功 → Producer 收到 ACK
ISR 抖动处理:broker 短暂 GC pause 或网络抖动导致 Follower 频繁进出 ISR,表现为 UnderMinIsr 指标波动。通过调大 replica.lag.time.max.ms(如 60s)和确保 Broker 有充足的内存避免 GC 过频来缓解。
8 Kafka 的 Leader 选举与 Controller Broker
答案:
Controller 是 Kafka 集群中负责管理分区和副本状态的 Broker,通过 ZooKeeper(或 KRaft 中的 Metadata Quorum)实现选举。
Controller 职责:
| 职责 | 说明 |
|---|---|
| 分区 Leader 选举 | Broker 宕机时为受影响分区选举新 Leader |
| ISR 管理 | 维护每个分区的 ISR 集合变更 |
| Topic 管理 | Topic 创建、删除、分区扩展 |
| 副本分配 | 新分区创建时的副本分布策略 |
| 元数据广播 | 向所有 Broker 广播集群元数据变更 |
| Preferred Leader 选举 | 触发和完成 Preferred Replica Leader 选举 |
Controller 选举流程(ZooKeeper 模式):
Broker 启动 → 尝试创建 /controller 临时节点 →
创建成功 → 成为 Controller,监听 ZooKeeper 变更 →
创建失败 → 监听 /controller 节点 →
当前 Controller 宕机 → ZooKeeper 临时节点删除 →
剩余 Broker 收到 Watch 通知 → 重新竞争创建 /controller →
新 Controller 当选
Controller 分区 Leader 选举策略:
| 策略 | 行为 |
|---|---|
| OfflinePartitionLeaderElectionStrategy | 仅当没有 Leader 时选举 |
| PreferedReplicaLeaderElectionStrategy | 优先选举 Preferred Replica 为 Leader |
| NoOpLeaderElectionStrategy | 不执行自动选举 |
Broker 宕机检测 → ZooKeeper 临时节点超时 →
Controller 收到 Broker 变更通知 →
遍历该 Broker 所有 Leader 分区 →
从 ISR 中选取第一个存活 Broker 作为新 Leader →
更新 ZooKeeper /brokers/topics/<topic>/partitions/<pid>/state →
向所有 Broker 广播 LeaderAndIsr 请求 →
各 Broker 更新本地元数据缓存
Controller 故障影响与优化:
- Controller 切换期间(数秒内),分区 Leader 选举、Topic 创建/删除等管理操作中断。
controlled.shutdown.enable=true时 Broker 正常下线由 Controller 优雅迁移分区,避免 Leader 批量切换。- KRaft 模式下 Controller 与 Broker 角色分离,Controller 独立为 Quorum 控制器节点。
9 Kafka 的 Exactly-Once 语义与事务
答案:
Kafka 的 Exactly-Once 语义(EOS)通过幂等生产者和事务机制实现消息不丢失、不重复的处理保证。
三种消息传递语义:
| 语义 | 机制 | 实现 |
|---|---|---|
| At-Most-Once | acks=0,发送即忘 | 可能丢消息 |
| At-Least-Once | acks=1/all,重试机制 | 可能重复 |
| Exactly-Once | 幂等 + 事务 | 不丢不重复 |
幂等生产者(Idempotent Producer):
Producer 初始化 → 向 Broker 请求 Producer ID(PID)→
Broker 分配 PID + Epoch → Producer 每条消息附带:
(PID, Epoch, SequenceNumber) →
Broker 按 (PID, Partition) 维护已提交的最大 SequenceNumber →
收到消息 → 检查:
Sequence == lastSeq+1 → 接受
Sequence <= lastSeq → 重复,丢弃,返回 ACK
Sequence > lastSeq+1 → 乱序,抛出 OutOfOrderSequenceException
# 幂等生产者配置
enable.idempotence=true
acks=all
max.in.flight.requests.per.connection=5
retries=2147483647
事务(Transactions):
Producer 初始化事务 → initTransactions() →
Broker 分配 TransactionalId →
beginTransaction() →
发送消息(附加 TransactionalId)→
消息写入分区但不向消费者可见 →
sendOffsetsToTransaction() →
commitTransaction() / abortTransaction() →
Broker 写入 Commit/Abort Marker →
消费者读取到 Marker 后过滤事务消息
EOS Consumer 配合:isolation.level=read_committed,仅读取已提交的事务消息,过滤未提交和已中止的事务消息。
EOS 性能代价:事务写入增加约 20%-30% 延迟,提高 Producer 内存占用,transaction.state.log 内部 Topic 产生额外网络和磁盘 I/O。
10 Kafka 的消息压缩(Snappy / LZ4 / GZip / Zstd)
答案:
Kafka 支持 Producer 端压缩后 Broker 原样存储的压缩策略,减少网络传输和磁盘占用。
压缩算法对比:
| 算法 | 压缩率 | 压缩速度 | 解压速度 | 适用场景 |
|---|---|---|---|---|
| Snappy | 中等(2x) | 极快 | 极快 | 低延迟场景,平衡选择 |
| LZ4 | 中等(2x) | 最快 | 最快 | 低延迟、高吞吐场景首选 |
| GZip | 高(4x-5x) | 慢 | 中等 | 存储密集型、冷数据归档 |
| Zstd | 最高(4x-6x) | 中等 | 快 | 高压缩率需求,推荐生产使用 |
| uncompressed | 1x | 无开销 | 无开销 | 实时性要求极高、消息本身已压缩 |
压缩配置层级:
# Producer 端压缩(推荐)
compression.type=zstd # none / gzip / snappy / lz4 / zstd
# Broker 端压缩策略
compression.type=producer # producer(保留原始压缩) / uncompressed(强制解压)
# Topic 级别压缩覆盖
# 通过 KafkaTopic CRD 的 config 指定
压缩在 Broker 端的处理:
Producer 压缩消息 → 封装为 MessageSet → 发送到 Broker →
Broker.compression.type=producer → 原样写入 Page Cache →
批量刷盘到日志段文件 →
Consumer Fetch 请求 → Broker 直接发送压缩数据 →
Consumer 解压消息集
Strimzi 中的压缩配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
kafka:
config:
compression.type: producer # Broker 端保持 Producer 压缩
message.max.bytes: 10485760 # 单条消息最大 10MB
Zstd 生产推荐理由:压缩率最高(减少磁盘和网络成本),解压速度快(对 Consumer 几乎无感知),支持字典压缩进一步提升小消息压缩率。
11 Kafka 的日志保留策略(Time / Size / Compaction)
答案:
Kafka 的日志保留策略通过 cleanup.policy 和配套参数控制消息在 Broker 上的生命周期。
三种清理策略:
| 策略 | cleanup.policy | 触发条件 | 适用场景 |
|---|---|---|---|
| 基于时间删除 | delete | retention.ms / retention.minutes / retention.hours | 日志、事件流、时序数据 |
| 基于大小删除 | delete | retention.bytes | 有限存储空间 |
| 日志压缩 | compact | min.cleanable.dirty.ratio | KV 状态存储、CDC 数据、配置快照 |
Strimzi KafkaTopic 保留策略配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: events-topic
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 12
replicas: 3
config:
# 基于时间 + 大小双重删除
cleanup.policy: delete
retention.ms: 604800000 # 7 天
retention.bytes: 107374182400 # 100GB / 分区
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: user-state-compacted
labels:
strimzi.io/cluster: my-cluster
spec:
partitions: 6
replicas: 3
config:
cleanup.policy: compact
min.cleanable.dirty.ratio: 0.5
delete.retention.ms: 86400000 # 删除标记保留 1 天
segment.ms: 86400000 # 每天产生新段
日志清理时机与机制:
Log Cleaner 线程定期扫描 → 选择脏比率最高的 Log →
逐段读取 → 构建 Offset Map(每个 Key 的最新 Offset)→
保留最新 Offset 的消息,删除旧消息 → 合并为干净的新段 →
交换新旧段 → 更新索引
保留策略参数优先级:
| 优先级 | 规则 |
|---|---|
| 最高 | 已确定为 Active Segment 的日志段不删除 |
| 高 | min.compaction.lag.ms / max.compaction.lag.ms 内的段不参与压缩 |
| 中 | retention.ms 和 retention.bytes 同时设置时先触发的优先生效 |
| 低 | Broker 级别 log.retention.* 作为默认值,Topic 级别配置覆盖 |
生产环境保留策略最佳实践:
| 数据类型 | 推荐策略 | 保留周期 | 原因 |
|---|---|---|---|
| 业务事件流 | delete + time | 7-30 天 | 事件有时效性 |
| 状态快照(CDC) | compact | 永久 | 仅需每个 Key 的最新值 |
| 告警事件 | delete + size | 100GB | 存储空间受限 |
| 审计日志 | delete + time | 90 天+ | 合规要求 |
12 Kafka 的消费者组(Consumer Group)Rebalance 机制
答案:
Consumer Group Rebalance 是 Kafka 在 Consumer 成员变化时重新分配分区所有权的协调过程。
触发 Rebalance 的条件:
| 触发条件 | 说明 |
|---|---|
| Consumer 加入或离开 Group | Consumer 启动、正常退出、崩溃(Session 超时) |
| Topic 分区数增加 | 新增分区需分配给 Consumer |
| 订阅的 Topic 变更 | Consumer 通过正则订阅匹配到新 Topic |
max.poll.interval.ms 超时 | Consumer 两次 poll 间隔超过阈值 |
Rebalance 协议阶段:
Phase 1: JoinGroup
Consumer → Group Coordinator: JoinGroupRequest(memberId, protocols)
Coordinator 选第一个 Consumer 为 Group Leader
Coordinator → Group Leader: JoinGroupResponse(所有 Consumer 信息)
Coordinator → 其他 Consumer: JoinGroupResponse(成员 ID)
Phase 2: SyncGroup
Group Leader 执行分配策略 → 生成 Partition Assignments →
Group Leader → Coordinator: SyncGroupRequest(分配方案)
Coordinator → 所有 Consumer: SyncGroupResponse(各 Consumer 的分区列表)
Consumer 收到分配方案 → 清空或保持未提交的 Offset → 开始消费
Rebalance 策略对比:
| 策略 | 算法 | 适用场景 |
|---|---|---|
| RangeAssignor | 按 Topic 分区范围连续分配 | 默认策略,平衡性一般 |
| RoundRobinAssignor | 所有分区轮询均匀分配 | Consumer 订阅 Topic 完全相同 |
| StickyAssignor | 尽量保持原有分配,减少分区迁移 | 生产推荐,减少无意义迁移 |
| CooperativeStickyAssignor | 增量式 Rebalance,不触发 Stop-The-World | 生产首选(Kafka 2.4+) |
Cooperative Rebalance(增量式重平衡):
与传统 Eager Rebalance 差异:
- 不触发全局 Stop-The-World
- Consumer 不必在 Rebalance 期间暂停所有分区消费
- 仅迁移需要重新分配的分区
- 未被重新分配的分区继续正常消费
Consumer 配置:
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Rebalance 优化措施:
# 延长 Session 超时,避免短暂网络波动触发 Rebalance
session.timeout.ms=30000 # Consumer 心跳超时(默认 45s)
# 延长心跳间隔,减少心跳开销
heartbeat.interval.ms=3000 # Consumer 向 Coordinator 发送心跳间隔
# 增加 Poll 间隔上限,防止慢速处理触发 Rebalance
max.poll.interval.ms=600000 # 两次 poll 最大间隔(默认 5min)
# 使用 Cooperative Sticky Rebalance
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
# 增加 poll 记录数,减少 poll 调用频率
max.poll.records=500
13 Kafka 的 Group Coordinator 与 Offset 管理
答案:
Group Coordinator 是 Broker 端管理 Consumer Group 的核心组件,负责 Consumer 成员管理、Offset 提交和 Rebalance 协调。
Coordinator 定位机制:
Consumer Group ID →
hash(groupId) % __consumer_offsets 分区数 → 目标分区 →
该分区 Leader 所在 Broker = Group Coordinator
Offset 存储架构(Kafka 0.9+):
旧架构(ZooKeeper 存储 Offset):
Consumer → ZK 直接写入 Offset
缺点:ZK 不适合高频写入,OOM 风险
新架构(__consumer_offsets Topic 存储 Offset):
Consumer → Group Coordinator → __consumer_offsets Topic
存储格式:Key = <GroupId, Topic, Partition>
Value = <Offset, Metadata, Timestamp>
压缩策略:compact(每个分区仅保留最新 Offset)
Offset 提交模式:
| 模式 | 配置 | 行为 | 风险 |
|---|---|---|---|
| 自动提交 | enable.auto.commit=true | 按 auto.commit.interval.ms 周期提交 | Consumer 故障后可能重复消费 |
| 同步手动提交 | consumer.commitSync() | 阻塞等待提交完成 | 降低吞吐 |
| 异步手动提交 | consumer.commitAsync() | 非阻塞提交,回调处理结果 | 网络异常时可能丢提交 |
| 事务内提交 | EOS 模式 | Offset 与消息在同一事务中原子提交 | 仅事务 Producer+Consumer 场景 |
Coordinator 故障处理:
Consumer 心跳超时 → Group Coordinator 移除该 Consumer →
触发 Rebalance → 但 Coordinator 自身可能也在故障 →
Consumer 收到 NOT_COORDINATOR 错误 →
重新 FindCoordinator Request → 定位新 Coordinator →
重新 JoinGroup
Consumer Offset 重置策略:
| 策略 | 行为 |
|---|---|
latest(默认) | 从最新位置开始(忽略历史消息) |
earliest | 从最早可用偏移量开始(重消费历史) |
none | 未找到 Offset 时抛出异常 |
Group Coordinator 在 KRaft 模式下的变化:Coordinator 由 KRaft Quorum 中的活跃 Controller 承担,Offset 仍存储在 __consumer_offsets Topic 中。
14 Kafka 的监控指标(JMX Exporter + Prometheus + Grafana)
答案:
Kafka on Kubernetes 的监控体系以 JMX 指标采集为基础,通过 JMX Exporter 转换为 Prometheus 格式,由 Grafana 可视化展示。
Strimzi 监控配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
name: kafka-metrics
key: kafka-metrics-config.yml
template:
pod:
metadata:
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9404"
JMX Exporter 配置(kafka-metrics-config.yml):
lowercaseOutputName: true
lowercaseOutputLabelNames: true
rules:
# Broker 核心指标
- pattern: kafka.server<type=BrokerTopicMetrics, name=(BytesInPerSec|BytesOutPerSec)><>OneMinuteRate
name: kafka_server_broker_topic_metrics_$1
type: GAUGE
- pattern: kafka.server<type=BrokerTopicMetrics, name=(MessagesInPerSec)><>OneMinuteRate
name: kafka_server_broker_topic_metrics_$1
type: GAUGE
# 网络请求
- pattern: kafka.network<type=RequestMetrics, name=(TotalTimeMs|RequestQueueTimeMs), request=(Produce|Fetch|FetchConsumer|FetchFollower)><>Count
name: kafka_network_request_metrics_$2_$1
labels:
request: "$3"
# ISR 与副本
- pattern: kafka.server<type=ReplicaManager, name=(UnderReplicatedPartitions|UnderMinIsrPartitionCount)><>Value
name: kafka_server_replica_manager_$1
- pattern: kafka.server<type=ReplicaManager, name=(IsrShrinksPerSec|IsrExpandsPerSec)><>OneMinuteRate
name: kafka_server_replica_manager_$1
# Producer 请求延迟
- pattern: kafka.network<type=RequestMetrics, name=TotalTimeMs, request=(Produce|FetchConsumer)><>(\w+)
name: kafka_network_$3_$1_$2
labels:
request: "$3"
Prometheus ServiceMonitor:
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: kafka-cluster-metrics
labels:
release: prometheus
spec:
selector:
matchLabels:
strimzi.io/cluster: my-cluster
strimzi.io/kind: Kafka
endpoints:
- port: tcp-prometheus
interval: 15s
scrapeTimeout: 10s
关键监控指标分类与告警阈值:
| 类别 | 关键指标 | 告警阈值 |
|---|---|---|
| 吞吐 | kafka_server_broker_topic_metrics_bytesinpersec | > 80% 磁盘带宽上限 |
| 延迟 | kafka_network_request_metrics_totaltimems_produce | P99 > 100ms |
| ISR | kafka_server_replica_manager_underreplicatedpartitions | > 0 持续 5 分钟 |
| 消费滞后 | kafka_consumergroup_group_lag | > 10000 或增长率 > 1000/min |
| 磁盘使用 | kafka_log_log_size | > 80% 磁盘容量 |
| Controller | kafka_controller_controllerstate | 不等于 1(Broker 不是 Controller 但值 = 0 OK) |
| 网络 | kafka_network_processor_idle_percent | < 0.3 持续 10 分钟 |
| GC | jvm_gc_pause_seconds | P99 > 500ms |
Grafana Dashboard 分层:
- Overview Dashboard:集群吞吐、延迟、活跃 Controller、Broker 数量
- Broker Dashboard:单 Broker 的请求速率、网络 I/O、磁盘 I/O、GC 行为
- Topic/Partition Dashboard:分区级别读写速率、大小、ISR 状态
- Consumer Group Dashboard:消费滞后、提交速率、Rebalance 事件
15 Kafka 的性能调优(Producer / Consumer / Broker 参数)
答案:
Kafka 性能调优需从 Producer、Broker、Consumer 三个维度分别优化。
Producer 端调优:
# 吞吐优先
batch.size=131072 # 批量大小 128KB
linger.ms=5 # 等待 5ms 攒批
compression.type=zstd # 压缩算法
buffer.memory=134217728 # 发送缓冲区 128MB
max.in.flight.requests.per.connection=5
acks=1 # Leader 确认即可
# 可靠性优先
acks=all # 所有 ISR 确认
enable.idempotence=true # 幂等生产者
max.in.flight.requests.per.connection=5
retries=2147483647 # 无限重试
delivery.timeout.ms=120000 # 超时 2 分钟
Broker 端调优:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
spec:
kafka:
config:
# 网络与 I/O
num.network.threads: 8 # 网络线程数(处理网络请求)
num.io.threads: 16 # I/O 线程数(处理磁盘 I/O)
socket.send.buffer.bytes: 1048576
socket.receive.buffer.bytes: 1048576
socket.request.max.bytes: 104857600
# Page Cache 与刷盘
log.flush.interval.messages: 9223372036854775807 # 不基于消息数刷盘
log.flush.interval.ms: 9223372036854775807 # 不基于时间刷盘(依赖 OS)
# 日志段与索引
log.segment.bytes: 1073741824 # 1GB 段
log.index.interval.bytes: 4096 # 稀疏索引密度
# 副本机制
num.replica.fetchers: 4 # Follower 拉取线程数
replica.fetch.max.bytes: 10485760
replica.socket.timeout.ms: 30000
# 压缩
compression.type: producer
# Leader 均衡
auto.leader.rebalance.enable: true
leader.imbalance.per.broker.percentage: 10
leader.imbalance.check.interval.seconds: 300
resources:
requests:
memory: 8Gi
cpu: 2
limits:
memory: 16Gi
cpu: 4
jvmOptions:
-Xms: 6g
-Xmx: 6g
-XX:+UseG1GC
-XX:MaxGCPauseMillis: 20
-XX:InitiatingHeapOccupancyPercent: 35
-XX:+DisableExplicitGC
Consumer 端调优:
# 吞吐优先
fetch.min.bytes=1048576 # 每次 Fetch 最少拉取 1MB
fetch.max.wait.ms=500 # 等待 500ms 攒批
max.partition.fetch.bytes=10485760 # 每分区最大 10MB
max.poll.records=500 # 每次 Poll 最多 500 条
# 延迟优先
fetch.min.bytes=1 # 有数据就返回
fetch.max.wait.ms=50 # 最多等待 50ms
# Rebalance 优化
session.timeout.ms=30000
heartbeat.interval.ms=3000
max.poll.interval.ms=600000
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
# Offset 管理
enable.auto.commit=false # 手动提交,避免自动提交带来的重复消费
JVM 与 OS 调优:
| 调优项 | 配置 | 说明 |
|---|---|---|
| Page Cache | vm.dirty_background_ratio=5 | 后台刷脏页阈值 |
| 文件描述符 | ulimit -n 100000 | Kafka 大量文件需求 |
| Swap | vm.swappiness=1 | 最小化 Swap |
| 磁盘调度 | noop / none | SSD 无 I/O 调度器 |
| 网络调优 | net.core.rmem_max=134217728 | 套接字接收缓冲区 |
16 Kafka 的存储配置(PVC / JBOD / Tiered Storage)
答案:
Strimzi 支持普通 PVC、JBOD 和 Tiered Storage 三种存储模式。
单 PVC 存储:
spec:
kafka:
storage:
type: persistent-claim
size: 500Gi
class: fast-ssd
deleteClaim: false
JBOD 存储:
spec:
kafka:
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 200Gi
class: fast-ssd
deleteClaim: false
- id: 1
type: persistent-claim
size: 200Gi
class: fast-ssd
deleteClaim: false
JBOD 与单 PVC 对比:
| 维度 | 单 PVC | JBOD |
|---|---|---|
| I/O 并行度 | 单卷串行 | 多卷并行 |
| 容量扩展 | 需扩容已有 PVC | 可添加新卷 |
| 日志目录分布 | 单目录 | 多目录轮询写入 |
| 磁盘故障隔离 | 全局故障 | 单卷故障不影响其他卷 |
| 成本 | 低 | 高(多块磁盘) |
Tiered Storage(Strimzi 0.39+,Kafka 3.6+ 特性):
spec:
kafka:
config:
# 启用 Tiered Storage
remote.log.storage.system.enable: true
remote.log.storage.manager.class.path: /opt/kafka/libs/remote-storage-*.jar
remote.log.metadata.manager.impl.class: org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
rlmm.config.remote.log.metadata.topic.replication.factor: 3
# 本地保留
local.retention.ms: 86400000 # 本地保留 1 天
local.retention.bytes: -2 # 本地无大小限制
# 远程保留
retention.ms: 2592000000 # 远程保留 30 天
Tiered Storage 架构:
Active Segments(本地 SSD) <── 热数据,最近 1 天
Closed Segments(远程 S3/MinIO) <── 冷数据,最近 30 天
Deleted Segments <── 过期数据,由 Log Cleaner 线程清除
- 优势:降低本地存储成本,冷数据使用对象存储,支持历史数据回溯。
- 限制:需要 S3 兼容对象存储(MinIO / Ceph RGW / AWS S3),性能会受远程读取延迟影响。
K8s StorageClass 选择:
| StorageClass | 性能 | 适用场景 |
|---|---|---|
fast-ssd(NVMe SSD) | 极高 IOPS | 高吞吐、低延迟生产环境 |
ssd(SATA SSD) | 高 IOPS | 标准生产环境 |
hdd | 低 IOPS | 冷数据、大规模历史存储 |
17 Kafka 的 SSL / TLS 加密与 SASL 认证
答案:
Strimzi 通过 Listener 配置同时支持 TLS 传输加密和多种 SASL 认证机制,管理客户端与 Broker 之间以及 Broker 之间的通信安全。
Listener 类型与安全协议:
| Listener Type | 安全协议 | 用途 |
|---|---|---|
internal + tls: true | TLS 传输加密 | Broker 间通信 |
internal + SASL | TLS + SASL 认证 | 客户端连接认证 |
external + tls: true | TLS | 外部客户端加密连接 |
external + SASL | TLS + SASL | 外部客户端认证连接 |
TLS 加密通信配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
listeners:
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: true
authentication:
type: tls
configuration:
bootstrap:
nodePort: 32094
brokers:
- broker: 0
nodePort: 32095
- broker: 1
nodePort: 32096
- broker: 2
nodePort: 32097
authorization:
type: simple
SASL 认证机制对比:
| 认证类型 | 凭证存储 | 密码安全 | 配置复杂度 | 性能影响 |
|---|---|---|---|---|
| SCRAM-SHA-512 | ZooKeeper / KRaft | 加密哈希 | 低 | 高(需密码哈希) |
| TLS 客户端证书 | Kubernetes Secret | 极高 | 中 | 低 |
| OAuth 2.0 | OAuth Provider | 高 | 高 | 中 |
| Plain | ZooKeeper / KRaft | 明文 | 极低 | 最低 |
KafkaUser SCRAM-SHA-512 配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: my-user
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
- resource:
type: topic
name: my-topic
patternType: literal
operations:
- Read
- Write
- Describe
host: "*"
- resource:
type: group
name: my-consumer-group
patternType: literal
operations:
- Read
host: "*"
客户端连接配置(SCRAM-SHA-512):
bootstrap.servers=my-cluster-kafka-tls-bootstrap.kafka.svc:9093
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="my-user" \
password="<password-from-secret>";
ssl.truststore.location=/path/to/ca.crt
ssl.truststore.type=PEM
TLS 证书管理:Strimzi 使用 Cluster Operator 内置 CA 自动签发和管理证书。CA 证书和 Broker 证书存储在 Kubernetes Secret 中,证书过期前自动轮换。也可对接外部 CA(cert-manager)签发证书。
Listener 安全策略矩阵:
plain (9092): No TLS + No Auth → 仅测试环境
tls (9093): TLS + No Auth → 加密但不认证
tls (9093): TLS + SASL → 加密 + 用户认证(生产推荐)
external (9094): TLS + SASL → 外部客户端安全接入
18 Kafka 的 ACL / RBAC 权限控制
答案:
Strimzi 通过 KafkaUser CRD 和 Simple Authorization 实现细粒度的 ACL 访问控制。
KafkaUser ACL 定义:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: app-user
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# 主题读写权限
- resource:
type: topic
name: orders-*
patternType: prefix
operations:
- Read
- Write
- Describe
- DescribeConfigs
host: "*"
# 消费者组权限
- resource:
type: group
name: order-processor
patternType: literal
operations:
- Read
host: "*"
# 集群操作权限
- resource:
type: cluster
name: kafka-cluster
operations:
- Describe
- DescribeConfigs
# 事务 ID 权限
- resource:
type: transactionalId
name: app-tx-*
patternType: prefix
operations:
- Write
- Describe
ACL 资源类型与操作映射:
| 资源类型 | 常用操作 | 说明 |
|---|---|---|
topic | Read / Write / Describe / DescribeConfigs / Alter / AlterConfigs / Create / Delete | Topic 级别控制 |
group | Read / Describe | Consumer Group 控制 |
cluster | Describe / DescribeConfigs / Alter / IdempotentWrite / Create | 集群级别控制 |
transactionalId | Write / Describe | 事务 Producer 控制 |
ACL Pattern 匹配:
| PatternType | 示例 | 匹配 |
|---|---|---|
literal | orders-topic | 精确匹配 orders-topic |
prefix | orders- | 匹配 orders-topic、orders-dlq 等 |
Super User 配置:不受 ACL 限制,用于内部组件(Entity Operator、Cruise Control、MirrorMaker 2)。
spec:
kafka:
authorization:
type: simple
superUsers:
- CN=entity-operator
- CN=cruise-control
- CN=my-mirror-maker-2
ACL 管理技巧:
- 以
KafkaUser为单元,每个微服务应用一个KafkaUser资源。 - 使用
patternType: prefix简化多 Topic 权限管理。 host字段设为*表示不限制 IP,生产环境建议限定 Pod CIDR。- ACL 修改后 User Operator 通过 Admin API 同步到 Kafka,延迟通常在数秒内。
19 Kafka 的 MirrorMaker 2 跨集群复制
答案:
MirrorMaker 2(MM2)基于 Kafka Connect 框架实现,支持集群间的 Topic、Consumer Group Offset 和 ACL 复制。
KafkaMirrorMaker2 CRD:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: dr-mirror
spec:
version: 3.7.0
replicas: 2
connectCluster: target-cluster
clusters:
- alias: source-cluster
bootstrapServers: source-kafka-bootstrap.source-ns.svc:9092
- alias: target-cluster
bootstrapServers: target-kafka-bootstrap.target-ns.svc:9092
mirrors:
- sourceCluster: source-cluster
targetCluster: target-cluster
sourceConnector:
config:
replication.factor: 3
offset-syncs.topic.replication.factor: 3
sync.topic.acls.enabled: true
refresh.topics.enabled: true
refresh.topics.interval.seconds: 60
tasks.max: 4
topicsPattern: "orders.*|payments.*|inventory.*"
groupsPattern: "order-.*|payment-.*"
checkpointConnector:
config:
checkpoints.topic.replication.factor: 3
refresh.groups.enabled: true
refresh.groups.interval.seconds: 60
sync.group.offsets.enabled: true
emit.checkpoints.enabled: true
tasks.max: 2
heartbeatConnector:
config:
heartbeats.topic.replication.factor: 3
tasks.max: 1
MM2 核心 Connector:
| Connector | 功能 |
|---|---|
| MirrorSourceConnector | 从源集群消费消息,写入目标集群(Topic 名加 source-cluster. 前缀) |
| MirrorCheckpointConnector | 同步 Consumer Group Offset(从源集群到目标集群) |
| MirrorHeartbeatConnector | 生成心跳消息,监控复制链路的健康状况 |
MM2 复制语义:
源集群 Topic: orders
目标集群 Topic: source-cluster.orders
复制逻辑:
MM2 从源集群消费 orders → 写入目标集群 source-cluster.orders →
依赖 Kafka Connect 的 Exactly-Once 支持保证不丟不重 →
心跳 Topic(source-cluster.heartbeats)持续写入监控延迟 →
Checkpoint Topic 记录 Consumer Group Offset 映射
MM2 部署架构:
[Source DC]
Kafka Cluster (source-cluster)
|
MM2 Workers (active-standby)
|
[Target DC]
Kafka Cluster (target-cluster)
- source-cluster.orders
- source-cluster.payments
- offsets synced
生产关注事项:
- MM2 Worker 建议部署在目标集群侧,减少跨 DC 延迟对写入的影响。
tasks.max按源集群的分区总数合理设置,单个 Task 处理的分区数不超过 20。- Offset 同步间隔(
emit.checkpoints.interval.seconds)按 RPO 目标设置,典型值 60s。 - MM2 对网络带宽要求高,需确保跨 DC 链路带宽足够(估算:源集群写入吞吐 x 压缩率)。
20 Kafka 的 Cruise Control 自动均衡
答案:
Cruise Control 是 Kafka 集群的自动负载均衡器,通过分析 Broker 的资源利用率和副本分布,自动生成并执行分区重分配方案。
Strimzi 集成 Cruise Control:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
cruiseControl:
config:
goals: >-
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuCapacityGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal
capacity.configFile: /opt/cruise-control/config/capacity.json
hard.goals: >-
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal
anomaly.detection.goals: >-
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal
resources:
requests:
memory: 1Gi
cpu: 500m
limits:
memory: 2Gi
cpu: 1
KafkaRebalance CRD 触发均衡:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
labels:
strimzi.io/cluster: my-cluster
spec:
goals:
- RackAwareGoal
- ReplicaCapacityGoal
- DiskCapacityGoal
- NetworkInboundCapacityGoal
- NetworkOutboundCapacityGoal
- CpuCapacityGoal
skipHardGoalCheck: false
Cruise Control 工作流程:
KafkaRebalance CR 创建 → Cruise Control 收到请求 →
Step 1: 收集集群指标(CPU / 磁盘 / 网络 / 副本数)→
Step 2: 构建负载模型 →
Step 3: 运行优化器(Goal-based)→ 生成分区重分配方案 →
Step 4: 更新 KafkaRebalance.status(ProposalReady)→
用户审批方案 → Approve →
Step 5: 执行分区重分配(Kafka Admin API)→
监控进度 → 完成更新 KafkaRebalance.status(Ready)
优化目标(Goals)分类:
| 目标 | 类型 | 说明 |
|---|---|---|
| RackAwareGoal | Hard | 分区副本跨 Rack 分布,必须满足 |
| ReplicaCapacityGoal | Soft | 副本数不超过 Broker 容量 |
| DiskCapacityGoal | Soft | 磁盘使用率跨 Broker 均衡 |
| CpuCapacityGoal | Soft | CPU 使用率跨 Broker 均衡 |
| NetworkInboundCapacityGoal | Soft | 入站网络流量均衡 |
| NetworkOutboundCapacityGoal | Soft | 出站网络流量均衡 |
| LeaderBytesInDistributionGoal | Soft | Leader 写入流量均衡 |
| ReplicaDistributionGoal | Soft | 分区副本数跨 Broker 均衡 |
| TopicReplicaDistributionGoal | Soft | 同一 Topic 副本跨 Broker 均匀分布 |
手动审批模式:
# 等待方案生成
kubectl wait kafkarebalance my-rebalance --for=condition=ProposalReady --timeout=300s
# 审批并执行
kubectl annotate kafkarebalance my-rebalance strimzi.io/rebalance=approve
# 停止正在执行的 Rebalance
kubectl annotate kafkarebalance my-rebalance strimzi.io/rebalance=stop
异常检测(Anomaly Detector):自动检测 Broker 故障、磁盘接近写满、指标异常波动等场景,自动触发 Rebalance 或告警。
21 Kafka 的备份与灾难恢复
答案:
Kafka on Kubernetes 的备份与灾难恢复覆盖数据层、配置层和元数据层。
备份层级:
| 层级 | 内容 | 工具 | 频率 |
|---|---|---|---|
| Kafka 数据 | Topic 消息数据 | Velero / Strimzi Drain Cleaner / S3 Tiered Storage | 持续 / 增量 |
| Kafka 配置 | CRD 资源定义(Kafka/KafkaTopic/KafkaUser 等) | kubectl / Velero | 每次变更后 |
| Consumer Group | Offset 数据 | MirrorMaker 2 / 内置 Offset Topic | 近实时 |
| ZooKeeper | 元数据 | Velero / ZooKeeper Snapshot | 定时 |
| 证书 | TLS 证书和 CA | cert-manager / Secret 备份 | 每次轮换后 |
Velero 备份方案:
apiVersion: velero.io/v1
kind: Backup
metadata:
name: kafka-full-backup
namespace: velero
spec:
includedNamespaces:
- kafka
labelSelector:
matchLabels:
strimzi.io/cluster: my-cluster
includeClusterResources: true
storageLocation: default
snapshotVolumes: true
hooks:
resources:
- name: pause-kafka
pre:
exec:
container: kafka
command:
- /bin/bash
- -c
- |
# 触发刷盘后暂停写入
/opt/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers --entity-default \
--alter --add-config unclean.leader.election.enable=false
Velero 恢复流程:
1. 准备恢复目标 K8s 集群(确保 StorageClass 兼容)
2. 创建 Velero Restore CR
velero restore create --from-backup kafka-full-backup
3. Velero 恢复 PVC 和 PV 数据
4. Strimzi Operator 恢复 StatefulSet / ConfigMap / Secret
5. Kafka Pod 启动,加载恢复的数据
6. 验证集群状态
- kubectl exec -it my-cluster-kafka-0 -- bin/kafka-topics.sh --list
- 检查 ISR 和 Under Replicated Partitions
7. 恢复 Consumer 从最近的 Committed Offset 开始消费
跨集群灾难恢复(Active-Standby):
[Active DC] [Standby DC]
Kafka Cluster ──MM2──→ Kafka Cluster
│ │
Clients Produce Clients Consume
(主写入) (DR 切换后接管)
切换流程:
1. 停止 Active 侧 Producer
2. 确认 MM2 同步延迟 = 0(无数据丢失)
3. 停止 MM2 Connector
4. 切换 Client DNS / Bootstrap Server 到 Standby
5. 启动 Standby 侧 Producer
6. Consumer 从 Standby 集群继续消费
RPO / RTO 目标:
| 方案 | RPO | RTO | 适用场景 |
|---|---|---|---|
| MM2 异步复制 | < 30s | < 5min | 跨 Region 灾备 |
| CRD GitOps 备份 | < 5min | < 30min | 配置灾难恢复 |
| Velero 备份 | < 1h | < 2h | 数据完全恢复 |
| Tiered Storage | < 1min | < 10min | 对象存储层面的数据恢复 |
22 Kafka 的 KRaft(ZooKeeper-less)架构
答案:
KRaft(Kafka Raft)是 Kafka 从 2.8 版本引入的元数据管理协议,用 Raft 一致性算法替代 ZooKeeper,Kafka 3.5 版本起标记为生产就绪。
ZooKeeper 模式 vs KRaft 模式:
| 维度 | ZooKeeper 模式 | KRaft 模式 |
|---|---|---|
| 元数据存储 | ZooKeeper 集群 | KRaft Quorum(Kafka 内部) |
| 外部依赖 | 需独立部署和运维 ZK 集群 | 无外部依赖 |
| Controller 角色 | Controller + Broker 同节点 | 可独立 Quorum Controller 节点 |
| 元数据同步 | ZK Watch + ZK 写入 | Raft Log Replication |
| 分区数上限 | 约 200K(受 ZK 限制) | 数百万分区 |
| 故障恢复速度 | 分钟级 | 秒级 |
| 部署复杂度 | 需管理两个集群(Kafka + ZK) | 单一集群管理 |
Strimzi KRaft 模式配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-kraft-cluster
annotations:
strimzi.io/node-pools: enabled
strimzi.io/kraft: enabled
spec:
kafka:
version: 3.7.0
listeners:
- name: plain
port: 9092
type: internal
tls: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
storage:
type: persistence-claim
size: 200Gi
KRaft NodePool 角色分离:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: controller
labels:
strimzi.io/cluster: my-kraft-cluster
spec:
replicas: 3
roles:
- controller
storage:
type: persistent-claim
size: 20Gi
resources:
requests:
memory: 2Gi
cpu: 1
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
name: broker
labels:
strimzi.io/cluster: my-kraft-cluster
spec:
replicas: 3
roles:
- broker
storage:
type: persistent-claim
size: 200Gi
resources:
requests:
memory: 8Gi
cpu: 2
KRaft Quorum 选举:
Controller 节点(3 节点 Quorum): majority = floor(3/2) + 1 = 2
任意 1 个 Controller 故障 → Quorum 仍可用
2 个 Controller 故障 → Quorum 不可用 → 元数据不可写
Controller 角色可选组合:
- 纯 Controller 节点(推荐): roles: [controller]
- 纯 Broker 节点: roles: [broker]
- 混合节点: roles: [controller, broker]
KRaft 迁移路径(ZK → KRaft):
Kafka 3.5+ → 启用 ZK 到 KRaft 的迁移工具 →
Step 1: 部署 KRaft Controller Quorum →
Step 2: 配置 Broker 同时连接 ZK 和 KRaft →
Step 3: 元数据从 ZK 迁移至 KRaft →
Step 4: 验证 KRaft 中元数据完整性 →
Step 5: 从 Broker 配置中移除 ZK 连接 →
Step 6: 下线 ZooKeeper 集群
KRaft 生产就绪注意事项:
- Controller 节点磁盘存储元数据日志(__cluster_metadata),建议使用高性能 SSD。
- Controller Quorum 节点数应为奇数(3 或 5),不允许扩展/收缩操作。
- 使用
strimzi.io/kraft: enabled注解时必须同时启用strimzi.io/node-pools: enabled。 - Strimzi 0.39+ KRaft 模式仍标记为 GA 预览(详见 Strimzi 版本发布说明)。
23 Kafka 的在线分区重分配
答案:
在线分区重分配是在不中断服务的情况下,将分区副本从某些 Broker 迁移到其他 Broker 的操作。
触发场景:
| 场景 | 说明 |
|---|---|
| 集群扩容 | 新增 Broker 后需要重新平衡分区 |
| 集群缩容 | Broker 下线前迁移其分区 |
| 热点消除 | 将热点分区从高负载 Broker 迁移 |
| 磁盘使用不均 | 均衡各 Broker 的磁盘使用率 |
| Rack Awareness | 修正不符合 Rack 分布的副本 |
Strimzi 通过 Cruise Control 执行重分配:
# 方式一:KafkaRebalance CRD
kubectl apply -f - <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
name: my-rebalance
labels:
strimzi.io/cluster: my-cluster
spec:
goals:
- RackAwareGoal
- ReplicaCapacityGoal
- DiskCapacityGoal
EOF
# 查看方案
kubectl describe kafkarebalance my-rebalance
# 批准执行
kubectl annotate kafkarebalance my-rebalance strimzi.io/rebalance=approve
手动分区重分配(kafka-reassign-partitions.sh):
# 1. 生成 Topic 列表(JSON 格式)
cat > topics-to-move.json <<EOF
{
"topics": [
{"topic": "orders"},
{"topic": "payments"}
],
"version": 1
}
EOF
# 2. 生成重分配方案
kubectl exec my-cluster-kafka-0 -- bin/kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--topics-to-move-json-file /tmp/topics-to-move.json \
--broker-list "0,1,2,3" \
--generate
# 3. 执行重分配
kubectl exec my-cluster-kafka-0 -- bin/kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/reassignment.json \
--execute
# 4. 验证进度
kubectl exec my-cluster-kafka-0 -- bin/kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/reassignment.json \
--verify
重分配的限流与影响控制:
# Kafka CR 中配置限流
spec:
kafka:
config:
# 副本间数据搬迁的限流(B/s),防止影响正常流量
leader.replication.throttled.rate: 104857600 # 100MB/s
follower.replication.throttled.rate: 104857600 # 100MB/s
重分配执行机制:
1. Controller 将分区标记为 Reassigning
2. 目标 Broker 启动 ReplicaFetcher 线程
3. 从 Leader 拉取分区数据到目标 Broker
4. 目标 Broker 数据追平 ISR 后,加入 ISR
5. Controller 从原 Broker 移除旧副本
6. 更新分区元数据
7. 分区回到正常状态
- 整个过程中生产者和消费者不中断。
- 重分配期间集群带宽消耗增加,需设置限流避免影响生产流量。
- 大规模重分配通过 Cruise Control 自动管理更可靠。
24 Kafka 的 Connect 框架与 Connector 管理
答案:
Kafka Connect 是 Kafka 的数据集成框架,用于将外部系统数据导入 Kafka(Source Connector)或从 Kafka 导出到外部系统(Sink Connector)。
KafkaConnect CRD:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
replicas: 2
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
build:
output:
type: docker
image: registry.example.com/kafka-connect:latest
plugins:
- name: debezium-mysql
artifacts:
- type: zip
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.6.0.Final/debezium-connector-mysql-2.6.0.Final-plugin.zip
sha512sum: <sha512-checksum>
KafkaConnector CRD:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mysql-source-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 2
config:
database.hostname: mysql.example.com
database.port: 3306
database.user: debezium
database.password: ${secret:mysql-secret:password}
database.server.id: 1
topic.prefix: cdc
database.include.list: orders,payments
table.include.list: orders.orders,payments.transactions
snapshot.mode: initial
include.schema.changes: true
transforms: Reroute,ExtractField
transforms.Reroute.type: io.debezium.transforms.ByLogicalTableRouter
transforms.Reroute.topic.regex: cdc.orders.orders
transforms.Reroute.topic.replacement: orders-events
Connect Worker 模式:
| 模式 | 配置 | 适用场景 |
|---|---|---|
| Distributed | replicas > 1 | 生产环境,高可用 |
| Standalone | replicas = 1 | 开发测试,非关键 |
Connect 内部 Topic:
| Topic | 用途 |
|---|---|
config.storage.topic | 存储 Connector 配置 |
offset.storage.topic | 存储 Connector Offset |
status.storage.topic | 存储 Connector 和 Task 状态 |
Connector 运行机制:
Connect Worker 启动 →
读取 config.storage.topic 中的 Connector 配置 →
Worker 分配 Connector → Tasks 数量 →
每个 Task 独立运行(并行度 = tasksMax)→
Source Task: poll() → 写入 Kafka → commit() offset
Sink Task: 消费 Kafka → put() 到外部系统 → flush() → commit() offset
Connector 失败处理:
| 配置 | 行为 |
|---|---|
errors.tolerance=none | 任务失败立即停止(默认) |
errors.tolerance=all | 忽略所有错误,跳过错误记录 |
errors.deadletterqueue.topic.name | 错误记录写入死信队列 |
errors.deadletterqueue.context.headers.enable=true | 死信消息附带上下文 Headers |
25 Kafka 的 Schema Registry 集成
答案:
Schema Registry 提供 Avro / Protobuf / JSON Schema 的集中式模式管理,确保 Producer 和 Consumer 之间的数据兼容性。
Strimzi Schema Registry 部署:
# Schema Registry 以独立 Deployment 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: schema-registry
labels:
app: schema-registry
spec:
replicas: 2
selector:
matchLabels:
app: schema-registry
template:
metadata:
labels:
app: schema-registry
spec:
containers:
- name: schema-registry
image: confluentinc/cp-schema-registry:7.6.0
env:
- name: SCHEMA_REGISTRY_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS
value: PLAINTEXT://my-cluster-kafka-bootstrap:9092
- name: SCHEMA_REGISTRY_KAFKASTORE_TOPIC
value: _schemas
- name: SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR
value: "3"
- name: SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL
value: SSL
- name: SCHEMA_REGISTRY_LISTENERS
value: http://0.0.0.0:8081
- name: SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL
value: backward
- name: SCHEMA_REGISTRY_ACCESS_CONTROL_ENABLED
value: "true"
ports:
- containerPort: 8081
resources:
requests:
memory: 1Gi
cpu: 500m
limits:
memory: 2Gi
cpu: 1
模式兼容性策略:
| 策略 | 含义 | 允许变更 |
|---|---|---|
| BACKWARD | 新 Schema 读取旧数据不失败 | 添加可选字段、删除字段 |
| FORWARD | 旧 Schema 读取新数据不失败 | 添加可选字段 |
| FULL | 向前向后都兼容 | 仅添加可选字段 |
| BACKWARD_TRANSITIVE | Schema 对所有历史版本向后兼容 | 从初始版本起仅添加可选字段 |
| FORWARD_TRANSITIVE | Schema 对所有历史版本向前兼容 | 从初始版本起仅删除字段 |
| FULL_TRANSITIVE | 所有版本间全兼容 | 最严格约束 |
| NONE | 不检查兼容性 | 任何修改 |
客户端集成:
# Producer 配置
key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url=http://schema-registry.kafka.svc:8081
auto.register.schemas=true
# Consumer 配置
key.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url=http://schema-registry.kafka.svc:8081
specific.avro.reader=true
生产关注事项:
_schemasTopic 必须设置高副本因子(>= 3)和cleanup.policy=compact。- Schema Registry 自身无状态,可水平扩展,但
_schemasTopic Leader 在同一时刻只有一个 Writer。 - 避免频繁注册新 Schema,每个新 Schema 写入
_schemasTopic 成本较高。 - Schema Registry 故障不影响 Kafka Broker 核心链路,但 Producer / Consumer 序列化 / 反序列化会中断。
26 Kafka 的 Quota 限流机制
答案:
Kafka 的 Quota 机制在客户端(Client ID / User)粒度上对网络带宽和请求速率进行限流,防止单个客户端耗尽 Broker 资源。
Quota 类型:
| Quota 类型 | 配置参数 | 粒度 | 说明 |
|---|---|---|---|
| 网络带宽 | producer_byte_rate / consumer_byte_rate | Client ID / User | 限制生产/消费的字节速率(B/s) |
| 请求速率 | request_percentage | Client ID / User | 限制占用 I/O 线程和网络线程的时间百分比 |
| Controller 变更 | controller_mutation_rate | Client ID / User | 限制客户端创建 Topic、分区、删除 Topic 的速率 |
| CPU | cpu_utilization | Client ID / User | 限制连接占用的 CPU 时间百分比 |
Strimzi 中通过 KafkaUser 设置 Quota:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: my-user
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
quotas:
producerByteRate: 10485760 # 10MB/s 生产限流
consumerByteRate: 20971520 # 20MB/s 消费限流
requestPercentage: 50 # 50% I/O 线程占用
Quota 生效优先级:
<user, client-id> Quota → 最高优先级
<user> Quota → 次优先级
<client-id> Quota → 再次优先级
<user, client-id> default → 默认 Quota
动态设置 Quota(kafka-configs.sh):
# 设置 User 级别生产带宽限流
kubectl exec my-cluster-kafka-0 -- bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type users --entity-name my-user \
--alter --add-config producer_byte_rate=10485760
# 设置 Client ID 级别请求率限流
kubectl exec my-cluster-kafka-0 -- bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type clients --entity-name app-producer \
--alter --add-config request_percentage=30
# 查看当前 Quota 配置
kubectl exec my-cluster-kafka-0 -- bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type users --entity-name my-user --describe
限流时的行为:
- 超出
producer_byte_rate/consumer_byte_rate的网络数据传输被延迟,连接不会断开。 - 超出
request_percentage的请求被延迟处理,客户端感知为请求延迟增加。 - 延迟机制由 Broker 端的
quota.window.num(采样窗口数)和quota.window.size.seconds(窗口大小)控制。
Quota 监控:
| 指标 | 说明 |
|---|---|
| `kafka.server:type={Produce | Fetch},name=throttle-time` |
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec | 入站字节速率 |
kafka.server:type=ClientQuotaManager,name=ThrottleTime | 总限流延迟统计 |
27 Kafka 的热点分区检测与处理
答案:
热点分区指流量、数据量或延迟显著高于同 Topic 其他分区的分区,会导致 Broker 负载不均、Consumer 消费倾斜。
热点分区的成因:
| 成因 | 说明 |
|---|---|
| 消息 Key 分布不均 | 特定 Key 的消息量远大于其他 Key |
| 分区分配倾斜 | 流量高分区集中在少数 Consumer |
| Broker 硬件差异 | Leader 分布在性能不同的 Broker 上 |
| 同一 Broker 承载过多 Leader | Controller 分区分配策略导致 Leader 集中 |
检测方法:
# 按 Topic 分组查看各分区的写入字节速率
kubectl exec my-cluster-kafka-0 -- bin/kafka-run-class.sh \
kafka.tools.JmxTool \
--object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=orders \
--reporting-interval 1000
# 通过 kafka-log-dirs.sh 查看各分区磁盘占用
kubectl exec my-cluster-kafka-0 -- bin/kafka-log-dirs.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic-list orders
# 查看 Topic 分区 Leader 分布
kubectl exec my-cluster-kafka-0 -- bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe --topic orders
Prometheus 热点分区查询:
# 单分区写入速率(检测热点)
topk(10, sum(rate(kafka_server_broker_topic_metrics_bytesinpersec[5m])) by (topic, partition))
# 分区大小 Top 10
topk(10, sum(kafka_log_log_size) by (topic, partition))
# 单 Broker 上 Leader 分区数
count(kafka_server_replicamanager_leadercount) by (pod)
处理策略:
| 策略 | 方法 | 效果 |
|---|---|---|
| 修改分区 Key | 增加 Key 粒度或散列化 Key | 从源头均衡数据分布 |
| 增加分区数 | 提高 Topic 分区数 | 分散数据到更多分区 |
| Leader 均衡 | 触发 Preferred Leader Election | 均匀分布 Leader |
| 分区重分配 | Cruise Control / kafka-reassign-partitions | 迁移分区到低负载 Broker |
| Consumer 调整 | 调整 max.poll.records、增加 Consumer 实例 | 消除消费倾斜 |
Strimzi 自动 Leader 均衡:
spec:
kafka:
config:
auto.leader.rebalance.enable: true
leader.imbalance.per.broker.percentage: 10 # 不平衡阈值 10%
leader.imbalance.check.interval.seconds: 300 # 每 5 分钟检查
Consumer 端处理热点:
// 自定义 Partitioner:热点 Key 拆分为多个子 Key
public class ConsistentHashPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
String keyStr = (String) key;
if (isHotKey(keyStr)) {
// 热点 Key 附加随机后缀散列
keyStr = keyStr + "-" + ThreadLocalRandom.current().nextInt(10);
}
return Math.abs(keyStr.hashCode()) % cluster.partitionCountForTopic(topic);
}
}
28 Kafka 的生产者幂等性(Idempotent Producer)
答案:
生产者幂等性确保在网络重试等异常场景下,同一条消息不会在 Broker 中被重复写入,是 Exactly-Once 语义的基础。
幂等性实现原理:
Producer 初始化 → 向 Broker 发送 InitProducerIdRequest →
Broker 分配 Producer ID(PID,单调递增的唯一 ID)→
Producer 为每个 <PID, TopicPartition> 维护 Sequence Number(从 0 开始)→
发送消息时附加 <PID, Epoch, SequenceNumber> →
Broker 端维护每个 <PID, TopicPartition> 已确认的最大 SequenceNumber →
收到消息:
1. Sequence == lastSeq+1 → 接受,更新 lastSeq
2. Sequence <= lastSeq → 重复消息,丢弃,返回 ACK(不重复写入)
3. Sequence > lastSeq+1 → 乱序,抛出 OutOfOrderSequenceException
幂等性配置:
enable.idempotence=true # 启用幂等
acks=all # 必须设为 all
max.in.flight.requests.per.connection=5 # ≤ 5
retries=2147483647 # 无限重试
Broker 端幂等性相关存储:
Producer ID 分配:
- ZK 模式:存储在 /latest_producer_id_block ZNode 中
- KRaft 模式:存储在 Controller 元数据日志中
Sequence Number 维护:
- 每个 Broker 的内存中维护 Snapshot 文件
- 保存最近 N 个 Producer 的 <PID, Epoch, SequenceNumber> 状态
- Snapshot 定期写入磁盘,Broker 启动时恢复
幂等性的局限:
| 场景 | 幂等性是否有效 | 说明 |
|---|---|---|
| Producer 内部重试 | 有效 | 相同 PID + SequenceNumber 被去重 |
| Producer 重启 | 有效(PID 不变) | 但 PID 可能因 Broker 再分配而改变 |
| Producer 实例更换 | 需配合事务 | 新实例分配新 PID,无法跨 PID 去重 |
| 跨 Session 消息 | 需配合事务 | 不同 Producer Session 无法保证去重 |
与事务的配合:
幂等生产者 → 单分区 / 单 Session 内的去重保证
事务生产者 →
跨分区原子写入 +
Consumer-Transform-Produce 循环中的 Exactly-Once +
跨 Session 去重(基于 TransactionalId 而非 PID)
幂等性丢失服务保护:
- 设置
transactional.id将去重标识从 PID 切换到 TransactionalId。 - 配合
transaction.timeout.ms控制事务超时后的资源回收。 max.in.flight.requests.per.connection=5与幂等性兼容(Kafka 1.0+),不必设为 1。
29 Kafka 常见故障排查
答案:
Kafka on Kubernetes 常见故障场景与排查方法。
场景一:Broker Pod 无法启动
症状:Pod 状态 CrashLoopBackOff / Pending
排查步骤:
1. kubectl describe pod my-cluster-kafka-0
2. kubectl logs my-cluster-kafka-0 -c kafka
3. 常见原因:
- PVC 无法挂载(StorageClass 不兼容 / PV 未创建)
- 内存不足(JVM OOMKilled)
- ZooKeeper 连接失败(ZK Pod 未就绪 / DNS 不可解析)
- 证书过期(TLS 证书校验失败)
- config 中的 broker.id 冲突
4. 解决:
- kubectl describe pvc data-my-cluster-kafka-0
- 检查 JVM heap 配置:-Xms 与 -Xmx 与容器 memory limit 匹配
- 验证 ZK 服务:kubectl exec my-cluster-zk-0 -- bin/zookeeper-shell.sh localhost:2181 stat
场景二:Under Replicated Partitions 持续 > 0
症状:kafka_server_replica_manager_underreplicated_partitions > 0 持续超过 5 分钟
排查步骤:
1. kubectl exec my-cluster-kafka-0 -- bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
2. 检查受影响 Broker 的状态
3. 常见原因:
- 某一 Broker 宕机或 OOM 重启
- 网络分区导致 Broker 间通信中断
- 磁盘 I/O 瓶颈导致 Follower 追不上 Leader
- Replica Fetcher 线程不足
4. 解决:
- 恢复故障 Broker
- 调大 num.replica.fetchers(默认 1 → 4)
- 检查磁盘 I/O 指标,必要时迁移分区
场景三:Consumer Lag 持续增长
症状:Consumer Group Lag 单调递增
排查步骤:
1. kubectl exec my-cluster-kafka-0 -- bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 --group order-processor --describe
2. 对比 Produce 速率与 Consumer 消费速率
3. 常见原因:
- Consumer 实例数量不足
- Consumer 处理逻辑慢(外部 API 调用 / DB 查询慢)
- Consumer 频繁 Rebalance 导致消费暂停
- 某些分区 Lag 高但其他分区 Lag 低(热点分区)
4. 解决:
- 增加 Consumer 实例(不超过分区数)
- 优化 Consumer 处理逻辑
- 启用 Cooperative Rebalance 减少 Rebalance 暂停时间
- 增加 max.poll.records 减少 Poll 频率
- 热点分区通过修改 Key 策略处理
场景四:Producer 请求超时
症状:Producer 端报 TimeoutException / request timed out
排查步骤:
1. 检查 Broker 端 produce 请求延迟
kafka_network_request_metrics_totaltimems_produce
2. 检查 Broker GC 暂停时间
jvm_gc_pause_seconds
3. 检查磁盘 I/O Util
4. 常见原因:
- acks=all 且 ISR 中有 Follower 延迟过高
- Broker 磁盘 I/O 饱和
- 网络带宽打满
- JVM GC 频繁导致 Broker 无响应
5. 解决:
- Producer acks 从 all 降为 1(非关键业务)
- 调整 JVM GC 参数减少暂停
- 扩容 Broker 或增加网络带宽
场景五:KafkaTopic CR 状态 NotReady
症状:KafkaTopic CR status.conditions 中出现 NotReady
排查步骤:
1. kubectl describe kafkatopic my-topic
2. kubectl logs deploy/my-cluster-entity-operator -c topic-operator
3. 常见原因:
- Topic Operator 无法连接 Kafka Admin API
- 分区数或副本因子超过集群容量
- Topic 名称不符合规范
- Kafka Broker 端 ACL 限制 Entity Operator 操作
4. 解决:
- 验证 Entity Operator 的 TLS 证书未过期
- 检查 KafkaTopic.spec 参数合法性
- 确认 Entity Operator 在 superUsers 列表中
30 Kafka on Kubernetes 生产环境最佳实践
答案:
Kafka on Kubernetes 生产部署需覆盖集群规划、资源配置、安全加固、监控告警和运维自动化五个维度。
集群规划:
| 维度 | 建议 |
|---|---|
| K8s 版本 | >= 1.27(支持 Strimzi 0.39+),优先选择长期支持版本 |
| 节点隔离 | Kafka Broker 专用节点组(NodeSelector / Taint + Toleration) |
| 可用区分布 | Broker 跨 3 个 AZ 分布(Rack Awareness 配置) |
| 网络 CNI | Cilium / Calico,支持 NetworkPolicy |
| 存储 | 独立的 StorageClass(NVMe SSD 优先),禁用 PVC 自动删除 |
| 集群规模 | 单集群分区数 < 200K,单个 Broker 分区数 < 6K |
资源配置:
spec:
kafka:
replicas: 3
resources:
requests:
memory: 8Gi
cpu: 2
limits:
memory: 16Gi
cpu: 4
jvmOptions:
-Xms: 6144m
-Xmx: 6144m # heap = limit * 0.5(剩余给 Page Cache)
-XX:+UseG1GC
-XX:MaxGCPauseMillis: 20
-XX:G1HeapRegionSize: 32m
-XX:MetaspaceSize: 128m
-XX:MaxMetaspaceSize: 256m
-XX:+ExitOnOutOfMemoryError
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 500Gi
class: nvme-ssd
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: strimzi.io/name
operator: In
values:
- my-cluster-kafka
topologyKey: kubernetes.io/hostname
tolerations:
- key: dedicated
operator: Equal
value: kafka
effect: NoSchedule
serviceAccount:
metadata:
annotations:
iam.gke.io/gcp-service-account: [email protected]
安全加固清单:
| 安全维度 | 措施 |
|---|---|
| 网络 | TLS 加密所有 Listener,NetworkPolicy 仅允许必要端口入站 |
| 认证 | SASL SCRAM-SHA-512 或 mTLS,禁用 Plain Listener |
| 授权 | KafkaUser ACL 最小权限,Entity Operator 配置 superUsers |
| 存储 | PVC 加密(StorageClass 开启 Encryption),Secret 使用 External Secrets Operator |
| 审计 | 启用 Kafka Authorizer 日志,汇聚至集中日志平台 |
| 镜像 | 使用官方 Strimzi 镜像,定期扫描 CVE 并更新 |
运维自动化:
# Cruise Control 异常检测
spec:
cruiseControl:
config:
anomaly.detection.goals: >-
com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal,
com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal
failed.brokers.zk.session.timeout.ms: 15000
metric.anomaly.finder.class: >-
com.linkedin.kafka.cruisecontrol.detector.NoopMetricAnomalyFinder
goal.violation.detection.interval.ms: 300000
| 运维场景 | 自动化方案 |
|---|---|
| 滚动升级 | Strimzi Operator 自动滚动升级,可配置 inter.broker.protocol.version 逐步升级 |
| 集群扩缩容 | 修改 kafka.replicas 后 Operator 自动处理,配合 Cruise Control Rebalance |
| 证书轮换 | Strimzi CA 自动轮换,证书过期前 30 天自动更新 |
| 分区均衡 | KafkaRebalance CRD 或 Cruise Control Anomaly Detector 定时均衡 |
| 备份恢复 | Velero 定时备份,MM2 跨集群灾备 |
| 容量规划 | Prometheus 指标 + 线性回归预测磁盘/分区增长趋势 |
关键告警规则:
groups:
- name: kafka-critical
rules:
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replica_manager_underreplicatedpartitions > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka Under Replicated Partitions = XQOPEN $value XQCLOSE"
- alert: KafkaUnderMinISR
expr: kafka_server_replica_manager_underminisrpartitioncount > 0
for: 2m
labels:
severity: critical
- alert: KafkaBrokerDown
expr: kafka_server_replicamanager_leadercount < 1
for: 2m
labels:
severity: critical
- alert: KafkaActiveControllerCount
expr: sum(kafka_controller_activecontroller) != 1
for: 1m
labels:
severity: critical
- alert: KafkaHighConsumerLag
expr: kafka_consumergroup_group_lag > 100000
for: 10m
labels:
severity: warning
- alert: KafkaDiskUsageHigh
expr: (kafka_log_log_size / kafka_log_log_size_limit) > 0.85
for: 5m
labels:
severity: warning
版本升级策略:
Strimzi 版本升级流程:
1. 阅读 Strimzi 版本升级说明(Upgrade Guide)
2. 备份 Kafka CRD 和配置
3. 升级 Strimzi Cluster Operator(Helm / OLM)
4. 升级 CRD 定义
5. 升级 Kafka CR 中的 version 字段(Kafka 版本)
6. Operator 自动触发 Broker 滚动升级
7. 验证集群状态和客户端兼容性
跨大版本升级原则:
- 不跳版本升级(如 3.5 → 3.6 → 3.7)
- 先升级 inter.broker.protocol.version,再升级 log.message.format.version
- 客户端版本与 Broker 版本的兼容性区间为 ±1 大版本