← 返回文章列表

Kafka 集群运维避坑指南

Apache Kafka 作为高吞吐量的分布式消息系统,在生产环境中扮演着重要角色。但 Kafka 集群运维过程中会遇到各种坑点,本文将总结常见的坑点和解决方案,帮助你更好地运维 Kafka 集群。

一、副本同步问题

1.1 ISR 列表缩小

当 follower 副本无法及时同步 leader 数据时,会被踢出 ISR(In-Sync Replicas)列表,导致可用副本减少。

1.2 解决方案

# 调整副本同步参数
# server.properties

# 增加 replica.lag.time.max.ms,避免网络波动导致副本被踢出
replica.lag.time.max.ms=30000  # 默认 10000

# 调整 replica.socket.timeout.ms
replica.socket.timeout.ms=30000

# 增加 replica.fetch.wait.max.ms
replica.fetch.wait.max.ms=500

# 检查副本同步情况
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

1.3 UnderReplicated Partitions 监控

# 查看 under-replicated 分区
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions

# JMX 指标监控
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount

二、消费者组重平衡问题

2.1 重平衡触发原因

  • 消费者加入或离开消费者组
  • Topic 分区数变化
  • 消费者心跳超时
  • 消费者处理消息超时

2.2 频繁重平衡解决方案

# 消费者配置优化
props.put("session.timeout.ms", "30000");        // 默认 10000
props.put("heartbeat.interval.ms", "10000");     // 默认 3000
props.put("max.poll.interval.ms", "300000");     // 默认 300000
props.put("max.poll.records", "500");            // 默认 500

# 避免在处理消息时提交 offset
# 使用手动提交,确保消息处理完成后再提交
props.put("enable.auto.commit", "false");

2.3 消费者组延迟监控

# 查看消费者组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --describe --group my-consumer-group

# 重点关注以下指标:
# CURRENT-OFFSET: 当前消费偏移量
# LOG-END-OFFSET: 最新偏移量
# LAG: 消费延迟(LOG-END-OFFSET - CURRENT-OFFSET)

三、磁盘与存储问题

3.1 磁盘空间不足

# 配置日志保留策略
# server.properties

# 按时间保留(默认 7 天)
log.retention.hours=168

# 按大小保留(默认不限制)
log.retention.bytes=107374182400  # 100GB

# 按 segment 大小滚动
log.segment.bytes=1073741824  # 1GB

# 检查日志大小
du -sh /var/lib/kafka-logs/*

3.2 磁盘 IO 瓶颈

# 监控磁盘 IO
iostat -x 1

# Kafka 相关指标
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec

# 优化建议:
# 1. 使用 SSD 替代 HDD
# 2. 数据目录分布在多个磁盘
# 3. 调整 log.flush.interval.messages 和 log.flush.interval.ms

3.3 磁盘故障处理

# 1. 识别故障磁盘
# 查看日志中的错误信息

# 2. 将故障磁盘上的分区迁移到其他磁盘
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
    --reassignment-json-file reassign.json --execute

# 3. 临时降低副本数(紧急情况)
kafka-topics.sh --bootstrap-server localhost:9092 \
    --alter --topic my-topic --partitions 6

四、网络与连接问题

4.1 Connection 数过多

# 查看当前连接数
netstat -an | grep 9092 | wc -l

# 调整操作系统参数
# /etc/sysctl.conf
net.core.somaxconn = 1024
net.ipv4.tcp_max_syn_backlog = 1024

# Kafka 连接配置
# server.properties
connections.max.idle.ms=600000  # 默认 10 分钟

4.2 网络分区问题

# 配置最小 ISR
# server.properties
min.insync.replicas=2  # 确保至少 2 个副本同步

# Topic 级别配置
kafka-topics.sh --bootstrap-server localhost:9092 \
    --create --topic my-topic --partitions 6 --replication-factor 3 \
    --config min.insync.replicas=2

# 生产者配置 acks=all,确保数据写入 ISR
props.put("acks", "all");
props.put("retries", "3");

五、ZooKeeper 相关问题

5.1 ZooKeeper 会话超时

# 调整 ZooKeeper 会话超时
# server.properties
zookeeper.session.timeout.ms=18000  # 默认 18000
zookeeper.connection.timeout.ms=10000

# ZooKeeper 配置优化
# zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
maxClientCnxns=60

5.2 ZooKeeper 数据清理

# 自动清理旧快照
# zoo.cfg
autopurge.snapRetainCount=3
autopurge.purgeInterval=24

# 手动清理(谨慎操作)
rm -rf /var/lib/zookeeper/version-2/log.*
rm -rf /var/lib/zookeeper/version-2/snapshot.*

六、性能调优

6.1 Producer 性能优化

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 批处理优化
props.put("batch.size", "16384");        // 默认 16KB
props.put("linger.ms", "5");             // 等待 5ms 批量发送
props.put("buffer.memory", "33554432");  // 32MB 缓冲区

// 压缩
props.put("compression.type", "snappy");  // snappy, lz4, gzip

// 异步发送
ProducerRecord<String, String> record = 
    new ProducerRecord<>("my-topic", "key", "value");
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            exception.printStackTrace();
        }
    }
});

6.2 Consumer 性能优化

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 拉取优化
props.put("fetch.min.bytes", "1048576");      // 至少拉取 1MB
props.put("fetch.max.wait.ms", "500");        // 最多等待 500ms
props.put("max.partition.fetch.bytes", "1048576");  // 每个分区 1MB

// 并行处理
props.put("max.poll.records", "1000");  // 每次拉取 1000 条

6.3 Broker 性能优化

# server.properties

# 网络线程数
num.network.threads=8  # 默认 3

# IO 线程数
num.io.threads=16  # 默认 8

# Socket 缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志刷盘策略
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# 页缓存
log.flush.scheduler.interval.ms=3000

七、监控与告警

7.1 关键监控指标

指标 说明 告警阈值
UnderReplicatedPartitions 未充分复制分区数 > 0
OfflinePartitions 离线分区数 > 0
Consumer Lag 消费延迟 > 10000
Disk Usage 磁盘使用率 > 80%

7.2 常用监控命令

# 查看集群状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

# 查看 Topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 --describe

# 查看消费者组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 查看指定消费者组详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --describe --group my-group

八、最佳实践总结

  1. 合理规划 Topic:分区数根据并发需求设置,副本数至少为 3
  2. 配置 min.insync.replicas:确保数据可靠性
  3. 监控消费者延迟:及时发现消费能力不足问题
  4. 定期清理过期数据:避免磁盘空间不足
  5. 使用 SSD:提升磁盘 IO 性能
  6. 配置合理的 JVM 参数:避免 GC 停顿
  7. 做好监控告警:关键指标实时告警
  8. 定期备份配置:防止配置丢失

Kafka 运维是一项持续的工作,需要不断积累经验。希望本文能帮助你避开常见的坑,更好地运维 Kafka 集群!