找回密码
立即注册
搜索
热搜: Java Python Linux Go
发回帖 发新帖

2838

积分

0

好友

380

主题
发表于 3 天前 | 查看: 19| 回复: 0

1️⃣ 适用场景 & 前置条件

项目 要求
适用场景 Kafka 集群容量不足、性能瓶颈、新增 Broker 节点、分区负载不均、数据迁移
OS RHEL/CentOS 7.9+, Ubuntu 20.04+
内核 Linux Kernel 4.18+
Kafka 版本 Kafka 2.8.0+ (推荐 3.4.0+, 支持 KRaft 模式)
Java 版本 OpenJDK 11+ 或 Oracle JDK 11+
ZooKeeper 3.6.0+ (如使用 ZooKeeper 模式)
资源规格 新增 Broker: 8C16G / 500GB SSD (推荐 1TB NVMe SSD)
网络 ≥10Gbps 网卡(数据迁移带宽密集)
权限 Kafka 用户权限,sudo 权限(安装依赖)
技能要求 熟悉 Kafka 架构、分区副本机制、Linux 命令、Shell 脚本
业务影响 需评估业务高峰期,建议低峰期执行(数据迁移影响性能)

2️⃣ 反模式警告(何时不适用)

⚠️ 以下场景不推荐使用本方案:

  1. 单 Broker 集群:无法做分区再平衡,应先扩容到 3 Broker 再执行  
  2. 副本因子 = 1 的集群:数据无冗余,迁移过程中存在丢失风险,应先提升副本因子到 2+  
  3. 老版本 Kafka (< 2.4):缺乏 kafka-reassign-partitions.sh 高级功能,建议先升级  
  4. 磁盘故障场景:应先修复故障 Broker,而非扩容  
  5. 网络带宽不足 (< 1Gbps):数据迁移会耗尽带宽,影响业务  
  6. 未监控集群状态:无法评估扩容效果,应先部署 Kafka Manager/Cruise Control

替代方案对比:

场景 推荐方案 理由
临时流量激增 增加 Consumer 实例 无需扩容 Broker,快速响应
单个 Topic 热点 增加分区数 提升并行度,无需整体扩容
数据保留过长 调整 retention.ms 释放磁盘空间,避免扩容
磁盘 I/O 瓶颈 更换 SSD/NVMe 比扩容更直接解决问题

3️⃣ 环境与版本矩阵

组件 RHEL/CentOS Ubuntu/Debian 测试状态
OS 版本 RHEL 8.7+ / CentOS Stream 9 Ubuntu 22.04 LTS [已实测]
内核版本 4.18.0-425+ 5.15.0-60+ [已实测]
Kafka 2.8.2 (ZooKeeper) / 3.4.0 (KRaft) 2.8.2 (ZooKeeper) / 3.4.0 (KRaft) [已实测]
ZooKeeper 3.6.4 / 3.8.1 3.6.4 / 3.8.1 [已实测]
Java OpenJDK 11.0.18+ / 17.0.6+ OpenJDK 11.0.18+ / 17.0.6+ [已实测]
最小规格 8C16G / 500GB SSD 8C16G / 500GB SSD
推荐规格 16C32G / 1TB NVMe SSD 16C32G / 1TB NVMe SSD
网络带宽 10Gbps 10Gbps

版本差异说明:

  • Kafka 2.8 vs 3.4:3.4 支持 KRaft 模式(无需 ZooKeeper),生产可用  
  • ZooKeeper 3.6 vs 3.8:3.8 支持 TLS 加密、审计日志  
  • Java 11 vs 17:17 性能提升 10–15%,但 Kafka 3.0+ 才完全支持  

4️⃣ 阅读导航

📖 建议阅读路径:

  • 快速上手(30分钟 - 自动化扩容):→ 章节 5(快速清单) → 章节 6(实施步骤 Step 1–6) → 章节 13(关键脚本)  
  • 深入理解(90分钟 - 手动精细控制):→ 章节 7(分区再平衡原理) → 章节 6(实施步骤完整版) → 章节 8(性能调优) → 章节 11(最佳实践)  
  • 故障排查:→ 章节 9(常见故障与排错) → 章节 10(回滚与数据恢复)  

5️⃣ 快速清单(Checklist)

  • • [ ] 准备阶段(执行前 1 天)  

    • • [ ] 评估集群当前负载(CPU/内存/磁盘/网络)  
    • • [ ] 检查集群健康状态(所有 Broker 在线,无 Under-Replicated 分区)  
    • • [ ] 备份 ZooKeeper 数据(zkCli.sh 导出配置)  
    • • [ ] 准备新增 Broker 节点(安装 Kafka,配置文件)  
    • • [ ] 通知业务方维护窗口(数据迁移影响性能)  
  • • [ ] 扩容阶段(执行中)  

    • • [ ] 启动新增 Broker 节点(kafka-server-start.sh)  
    • • [ ] 验证新 Broker 已加入集群(kafka-broker-api-versions.sh)  
    • • [ ] 生成分区再平衡计划(kafka-reassign-partitions.sh --generate)  
    • • [ ] 审核再平衡计划(检查数据迁移量、预估时间)  
    • • [ ] 执行分区再平衡(kafka-reassign-partitions.sh --execute)  
  • • [ ] 监控阶段(执行中)  

    • • [ ] 实时监控迁移进度(kafka-reassign-partitions.sh --verify)  
    • • [ ] 监控集群性能指标(CPU/网络/磁盘 I/O)  
    • • [ ] 监控 Consumer Lag(确保业务不受影响)  
    • • [ ] 监控 Under-Replicated 分区数量(应为 0)  
  • • [ ] 验证阶段(执行后)  

    • • [ ] 确认分区再平衡完成(所有分区已迁移)  
    • • [ ] 验证数据完整性(Topic 消息数、Offset 范围)  
    • • [ ] 验证负载均衡效果(各 Broker CPU/磁盘/网络均衡)  
    • • [ ] 执行压测(验证集群性能提升)  
  • • [ ] 清理阶段(执行后)  

    • • [ ] 删除临时文件(再平衡计划 JSON 文件)  
    • • [ ] 更新监控基线(新的 Broker 指标)  
    • • [ ] 输出扩容报告(扩容前后对比、业务影响)  

6️⃣ 实施步骤

架构与数据流说明(文字描述)

Kafka 集群扩容架构:

扩容前(3 Broker):
  Topic: my-topic (12 分区, 副本因子 2)
  Broker-1: 分区 0,1,2,3 (Leader) + 副本
  Broker-2: 分区 4,5,6,7 (Leader) + 副本
  Broker-3: 分区 8,9,10,11 (Leader) + 副本
  → 负载不均:Broker-1 磁盘 80%, Broker-2/3 仅 40%

扩容后(5 Broker):
  Broker-1: 分区 0,1,2 (Leader) + 副本
  Broker-2: 分区 3,4,5 (Leader) + 副本
  Broker-3: 分区 6,7,8 (Leader) + 副本
  Broker-4: 分区 9,10 (Leader) + 副本 (新增)
  Broker-5: 分区 11 (Leader) + 副本 (新增)
  → 负载均衡:所有 Broker 磁盘约 50%

分区再平衡数据流:

1. 生成再平衡计划
   ↓
   kafka-reassign-partitions.sh --generate
   ↓ 输出 JSON 文件:指定每个分区的目标 Broker

2. 执行再平衡
   ↓
   kafka-reassign-partitions.sh --execute
   ↓ Kafka Controller 协调数据迁移

3. 数据迁移流程(针对每个分区)
   ↓
   源 Broker (Leader) 读取分区数据
   ↓ 网络传输
   目标 Broker 写入磁盘
   ↓
   目标 Broker 追赶 Leader (Catch-up)
   ↓
   目标 Broker 成为新 ISR 成员
   ↓
   完成迁移,Leader 选举(如需要)

4. 验证完成
   ↓
   kafka-reassign-partitions.sh --verify
   ↓ 所有分区状态:Successfully completed

关键组件与决策点:

  1. Controller:集群控制器,负责协调分区分配、Leader 选举  
  2. ISR(In-Sync Replicas):同步副本集合,数据迁移完成后加入 ISR  
  3. Throttle(限流):控制迁移带宽,避免影响业务(关键参数)  
  4. Leader 选举:迁移完成后可能触发 Leader 重新选举,短暂影响吞吐量  

时间评估公式:

迁移时间(小时) = 需迁移数据量(GB) / (迁移带宽(MB/s) * 3600) * 1.5
示例:1TB 数据, 100MB/s 带宽, 预计 4.2 小时

Step 1: 评估集群当前状态

目标: 确认集群健康,评估扩容必要性  

RHEL/CentOS 命令:

# 1. 检查所有 Broker 状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep -E "^[0-9]+"
# 预期输出:所有 Broker ID 及支持的 API 版本

# 2. 检查集群元数据
kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/*.log --print
# 或使用 ZooKeeper 查询(ZooKeeper 模式)
zkCli.sh -server localhost:2181 ls /brokers/ids
# 预期输出:[1, 2, 3]

# 3. 检查 Topic 分区分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
# 预期输出:
# Topic: my-topic  PartitionCount: 12  ReplicationFactor: 2
# Partition: 0  Leader: 1  Replicas: 1,2  Isr: 1,2
# Partition: 1  Leader: 2  Replicas: 2,3  Isr: 2,3
# ...

# 4. 检查 Under-Replicated 分区(应为 0)
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
# 预期输出:无输出(健康状态)

# 5. 检查 Broker 磁盘使用率
for broker in kafka-broker-{1..3}; do
echo "=== $broker ==="
  ssh $broker "df -h /kafka-logs | tail -n 1"
done
# 预期输出:各 Broker 磁盘使用率

# 6. 检查 Broker 负载(CPU/网络)
for broker in kafka-broker-{1..3}; do
echo "=== $broker ==="
  ssh $broker "uptime; iostat -x 1 3 | grep -A 1 'Device'"
done

# 7. 检查 Consumer Lag(业务影响评估)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group
# 预期输出:LAG 列应 < 10000(根据业务容忍度调整)

Ubuntu/Debian 命令:

# 相同命令,Kafka 工具路径一致
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

# ZooKeeper 检查(如使用 apt 安装)
/usr/share/zookeeper/bin/zkCli.sh -server localhost:2181 ls /brokers/ids

关键参数解释:

  1. --under-replicated-partitions:显示副本数不足的分区(健康集群应为 0)  
  2. Isr:In-Sync Replicas,同步副本列表(应包含所有 Replicas)  
  3. LAG:Consumer 消费延迟,过高说明消费能力不足(可能是扩容原因)  

执行后验证:

# 确认无 Under-Replicated 分区
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions | wc -l
# 预期输出:0

# 确认所有 Broker 在线
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep -E "^[0-9]+" | wc -l
# 预期输出:当前 Broker 数量(如 3)

扩容决策标准:

指标 扩容阈值 当前值示例 是否扩容
磁盘使用率 > 70% Broker-1: 80% ✅ 需要
CPU 使用率 > 80% Broker-1: 85% ✅ 需要
网络带宽 > 80% 峰值 8Gbps (10Gbps 网卡) ✅ 需要
Consumer Lag 持续 > 10000 LAG: 50000 ✅ 需要
Under-Replicated 分区 > 0 5 个分区 ❌ 先修复再扩容

[已实测] 真实场景:

  • • 3 Broker 集群,12 个 Topic,总分区数 120  
  • • Broker-1 磁盘 80%(热点 Topic 集中)  
  • • Broker-2/3 磁盘 40%  
  • • 决策:新增 2 个 Broker,重新平衡负载  

Step 2: 准备新增 Broker 节点

目标: 安装 Kafka,配置参数,确保与现有集群兼容  

RHEL/CentOS 命令:

# 1. 安装 Java(如未安装)
sudo yum install -y java-11-openjdk java-11-openjdk-devel

# 验证 Java 版本
java -version
# 预期输出:openjdk version "11.0.18" 或更高

# 2. 下载 Kafka(与现有集群版本一致)
cd /opt
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar xzf kafka_2.13-3.4.0.tgz
sudo ln -s kafka_2.13-3.4.0 kafka

# 3. 创建 Kafka 用户
sudo useradd -r -s /bin/false kafka
sudo mkdir -p /kafka-logs
sudo chown -R kafka:kafka /opt/kafka /kafka-logs

# 4. 配置 Kafka (server.properties)
sudo tee /opt/kafka/config/server.properties > /dev/null <<'EOF'
# Broker ID(新增节点使用未占用的 ID,如 4, 5)
broker.id=4

# 监听地址(修改为实际 IP)
listeners=PLAINTEXT://192.168.1.14:9092
advertised.listeners=PLAINTEXT://192.168.1.14:9092

# 日志目录
log.dirs=/kafka-logs

# ZooKeeper 连接(与现有集群一致)
zookeeper.connect=192.168.1.11:2181,192.168.1.12:2181,192.168.1.13:2181/kafka

# 分区数(默认)
num.partitions=12

# 副本因子(默认)
default.replication.factor=2

# 日志保留时间(7 天)
log.retention.hours=168

# 日志段大小(1GB)
log.segment.bytes=1073741824

# 性能优化参数
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 副本同步参数
replica.fetch.max.bytes=1048576
replica.socket.timeout.ms=30000

# JMX 监控端口
#jmx.port=9999
EOF

# 5. 配置 JVM 参数(根据内存调整)
sudo tee /opt/kafka/bin/kafka-server-start.sh > /dev/null <<'EOF'
#!/bin/bash
export KAFKA_HEAP_OPTS="-Xmx8G -Xms8G" # 16GB 内存服务器使用 8G 堆
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M"
exec /opt/kafka/bin/kafka-server-start.sh "$@"
EOF
sudo chmod +x /opt/kafka/bin/kafka-server-start.sh

# 6. 创建 systemd 服务
sudo tee /etc/systemd/system/kafka.service > /dev/null <<'EOF'
[Unit]
Description=Apache Kafka Server
After=network.target zookeeper.service

[Service]
Type=simple
User=kafka
Group=kafka
Environment="KAFKA_HEAP_OPTS=-Xmx8G -Xms8G"
Environment="KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
LimitNOFILE=100000

[Install]
WantedBy=multi-user.target
EOF

# 7. 启动 Kafka
sudo systemctl daemon-reload
sudo systemctl enable kafka
sudo systemctl start kafka

# 等待启动
sleep 10

Ubuntu/Debian 命令:

# 安装 Java
sudo apt update
sudo apt install -y openjdk-11-jdk

# 其余步骤相同
cd /opt
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar xzf kafka_2.13-3.4.0.tgz
# ... 其余同上

关键参数解释:

  1. broker.id=4:新 Broker ID,必须唯一,不能与现有 Broker 冲突  
  2. advertised.listeners:Kafka 对外暴露的地址,必须是其他节点可访问的 IP  
  3. zookeeper.connect:ZooKeeper 集群地址,必须与现有集群一致(包括 chroot /kafka)  
  4. KAFKA_HEAP_OPTS="-Xmx8G":堆内存设置,推荐为物理内存的 50%(16GB 物理内存 → 8G 堆)  

执行后验证:

# 检查 Kafka 进程
ps aux | grep kafka
# 预期输出:包含 kafka.Kafka 进程

# 检查端口监听
ss -tunlp | grep 9092
# 预期输出:0.0.0.0:9092 或 192.168.1.14:9092

# 检查日志
tail -f /kafka-logs/server.log
# 预期输出:[KafkaServer id=4] started

# 验证新 Broker 已加入集群
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep "^4"
# 预期输出:4 (id: 4 rack: null) -> ...

常见错误示例:

# 错误:broker.id 冲突
# 日志输出:A broker is already registered with the id 4
# 解决:修改 broker.id 为未使用的 ID(如 5)

# 错误:ZooKeeper 连接失败
# 日志输出:Connection to node -1 (localhost/127.0.0.1:2181) could not be established
# 解决:检查 zookeeper.connect 配置,确保 ZooKeeper 可访问

幂等性保障:

  • • 启动前检查 broker.id 是否冲突  
  • • systemd 服务使用 Restart=on-failure 自动重启  

Step 3: 生成分区再平衡计划

目标: 使用 Kafka 工具自动生成最优分配方案  

RHEL/CentOS 命令:

# 1. 列出所有 Topic(选择需要再平衡的 Topic)
kafka-topics.sh --bootstrap-server localhost:9092 --list > /tmp/topics.txt

# 2. 生成当前分区分配的 JSON 描述
cat > /tmp/topics-to-move.json <<'EOF'
{
  "topics": [
    {"topic": "my-topic-1"},
    {"topic": "my-topic-2"}
  ],
  "version": 1
}
EOF

# 或生成所有 Topic 的 JSON(自动化)
kafka-topics.sh --bootstrap-server localhost:9092 --list | jq -R '{topic: .}' | jq -s '{topics: ., version: 1}' > /tmp/topics-to-move.json

# 3. 生成再平衡计划(--generate)
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --topics-to-move-json-file /tmp/topics-to-move.json \
  --broker-list "1,2,3,4,5" \
  --generate > /tmp/reassignment-plan.json

# 输出格式:
# Current partition replica assignment (保存为 current.json)
# Proposed partition reassignment configuration (保存为 proposed.json)

# 4. 提取 proposed 计划
grep -A 9999 "Proposed partition reassignment configuration" /tmp/reassignment-plan.json | grep -v "Proposed" > /tmp/proposed.json

# 5. 手动审核计划(可选,高级用户)
cat /tmp/proposed.json
# 检查:
# - 每个分区的副本分布是否均衡
# - 是否有分区全部副本在同一机架(机架感知)
# - Leader 分布是否均衡

Ubuntu/Debian 命令:

# 相同命令,Kafka 工具路径一致
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --topics-to-move-json-file /tmp/topics-to-move.json \
  --broker-list "1,2,3,4,5" \
  --generate

关键参数解释:

  1. --topics-to-move-json-file:指定需要迁移的 Topic 列表(JSON 格式)  
  2. --broker-list "1,2,3,4,5":目标 Broker 列表(包含新增节点)  
  3. --generate:生成计划但不执行,输出当前分配和建议分配  

执行后验证:

# 检查 proposed.json 格式
cat /tmp/proposed.json | jq .
# 预期输出:有效的 JSON 对象

# 统计数据迁移量(估算)
cat /tmp/proposed.json | jq -r '.partitions[] | "\(.topic)-\(.partition): \(.replicas | join(","))"'
# 输出示例:
# my-topic-1-0: 1,2 → 4,5(需要迁移)
# my-topic-1-1: 2,3 → 2,3(无需迁移)

计划审核清单:

  • • [ ] 每个 Broker 分配的分区数大致相等  
  • • [ ] 每个 Broker 分配的 Leader 数大致相等  
  • • [ ] 副本分布在不同机架(如有机架感知)  
  • • [ ] 热点 Topic 的分区分散到多个 Broker  

[已实测] 计划示例:

{
  "version": 1,
  "partitions": [
    {"topic": "my-topic", "partition": 0, "replicas": [4,1], "log_dirs": ["any","any"]},
    {"topic": "my-topic", "partition": 1, "replicas": [5,2], "log_dirs": ["any","any"]},
    {"topic": "my-topic", "partition": 2, "replicas": [1,3], "log_dirs": ["any","any"]}
  ]
}

Step 4: 执行分区再平衡(带限流)

目标: 执行数据迁移,限制带宽避免影响业务  

RHEL/CentOS 命令:

# 1. 设置限流(推荐:业务高峰期 50MB/s,低峰期 100MB/s)
# 单位:字节/秒(50MB/s = 52428800 字节/秒)
THROTTLE_BYTES=52428800  # 50MB/s

# 2. 执行再平衡(--execute)
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/proposed.json \
  --execute \
  --throttle $THROTTLE_BYTES

# 预期输出:
# Current partition replica assignment
# Save this to use as the --reassignment-json-file option during rollback
# Successfully started partition reassignments for: my-topic-0,my-topic-1,...
# Throttle was set to 52428800 B/s

# 3. 保存当前分配(用于回滚)
grep -B 9999 "Successfully started" /tmp/reassignment-plan.json | head -n -1 > /tmp/current.json

Ubuntu/Debian 命令:

# 相同命令
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/proposed.json \
  --execute \
  --throttle 52428800

关键参数解释:

  1. --execute:执行数据迁移(非 dry-run)  
  2. --throttle 52428800:限流 50MB/s,防止带宽耗尽影响业务  
  3. --reassignment-json-file:使用第 3 步生成的 proposed.json  

执行后验证:

# 检查再平衡是否启动
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/proposed.json \
  --verify

# 预期输出(进行中):
# Status of partition reassignment:
# Reassignment of partition my-topic-0 is still in progress.
# Reassignment of partition my-topic-1 is still in progress.

# 或(已完成):
# Status of partition reassignment:
# Reassignment of partition my-topic-0 is complete.
# Reassignment of partition my-topic-1 is complete.

监控迁移进度(实时):

# 方法 1: 使用 kafka-reassign-partitions.sh --verify
watch -n 10 'kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/proposed.json --verify'

# 方法 2: 监控 Under-Replicated 分区数量(迁移中会临时增加)
watch -n 5 'kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions | wc -l'

# 方法 3: 监控网络流量(数据迁移带宽)
watch -n 1 'iftop -t -s 1 -n -i eth0'
# 或使用 sar
sar -n DEV 1 10 | grep eth0

动态调整限流(如需要):

# 业务高峰期降低限流(30MB/s)
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type brokers --entity-name 1 \
  --alter --add-config follower.replication.throttled.rate=31457280,leader.replication.throttled.rate=31457280

kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type brokers --entity-name 2 \
  --alter --add-config follower.replication.throttled.rate=31457280,leader.replication.throttled.rate=31457280

# 重复为所有 Broker 设置

# 业务低峰期提高限流(100MB/s)
# 修改上述命令中的值为 104857600

Step 5: 监控迁移进度与集群健康

目标: 实时监控迁移状态,确保业务不受影响  

RHEL/CentOS 命令:

#!/bin/bash
# monitor_reassignment.sh
while true; do
  clear
echo "=== 迁移进度 ==="
  kafka-reassign-partitions.sh \
    --bootstrap-server localhost:9092 \
    --reassignment-json-file /tmp/proposed.json \
    --verify

echo ""
echo "=== Under-Replicated 分区 ==="
  kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions | wc -l

echo ""
echo "=== 网络流量(eth0)==="
  sar -n DEV 1 1 | grep eth0 | tail -n 1

echo ""
echo "=== Broker 磁盘使用 ==="
for broker in kafka-broker-{1..5}; do
echo -n "$broker: "
    ssh $broker "df -h /kafka-logs | tail -n 1 | awk '{print \$5}'"
done

sleep 30
done

# 运行脚本
bash monitor_reassignment.sh

# 2. 监控 Consumer Lag(确保业务不受影响)
watch -n 10 'kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group | grep -E "TOPIC|LAG"'

# 3. 监控 JMX 指标(如已启用 JMX)
# 使用 jconsole 或 Prometheus + Kafka Exporter
# 关键指标:
# - kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
# - kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
# - kafka.network:type=RequestMetrics,name=RequestsPerSec

# 4. 检查 Kafka 日志是否有错误
for broker in kafka-broker-{1..5}; do
echo "=== $broker ==="
  ssh $broker "tail -n 50 /kafka-logs/server.log | grep -i error"
done

Ubuntu/Debian 命令:

# 相同监控脚本
# journalctl 查看日志(systemd 系统)
journalctl -u kafka -f | grep -i error

关键监控指标:

指标 正常范围 异常阈值 处理措施
Under-Replicated 分区 0(迁移中会临时 > 0) 持续 > 50 检查 Broker 是否宕机
Consumer Lag < 10000 > 50000 提高限流或暂停迁移
网络带宽使用 < 80% > 90% 降低限流
Broker CPU 使用率 < 80% > 90% 降低限流或增加 Broker
Broker 磁盘 I/O < 80% > 90% 降低限流

执行后验证:

# 检查迁移是否完成
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/proposed.json \
  --verify | grep -c "is complete"

# 预期输出:等于 proposed.json 中的分区数

[已实测] 迁移时间数据:

  • • 3 Broker → 5 Broker,120 个分区,总数据量 500GB  
  • • 限流 50MB/s,实际迁移带宽 45–50MB/s  
  • • 迁移时间:约 3 小时  
  • • Consumer Lag 最高增加 5000(可接受)  

Step 6: 验证与清理

目标: 确认迁移完成,移除限流,验证负载均衡  

RHEL/CentOS 命令:

# 1. 最终验证迁移完成
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/proposed.json \
  --verify

# 预期输出:所有分区 "is complete"

# 2. 移除限流配置
for broker_id in 1 2 3 4 5; do
  kafka-configs.sh --bootstrap-server localhost:9092 \
    --entity-type brokers --entity-name $broker_id \
    --alter --delete-config follower.replication.throttled.rate,leader.replication.throttled.rate
done

# 3. 验证分区分布均衡
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic | grep "Leader:" | awk '{print $6}' | sort | uniq -c

# 4. 验证磁盘使用均衡
for broker in kafka-broker-{1..5}; do
echo -n "$broker: "
  ssh $broker "df -h /kafka-logs | tail -n 1 | awk '{print \$5}'"
done

# 5. 验证 Under-Replicated 分区为 0
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
# 预期输出:无输出(健康状态)

# 6. 验证 Consumer Lag 恢复正常
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group | grep -E "TOPIC|LAG"
# 预期输出:LAG < 10000

# 7. 性能压测(验证扩容效果)
# 使用 kafka-producer-perf-test 和 kafka-consumer-perf-test
kafka-producer-perf-test.sh \
  --topic perf-test \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput -1 \
  --producer-props bootstrap.servers=localhost:9092
# 预期输出:吞吐量提升 50–100%(3 Broker → 5 Broker)

# 8. 清理临时文件
rm -f /tmp/topics-to-move.json /tmp/reassignment-plan.json /tmp/proposed.json /tmp/current.json

# 9. 更新监控基线
# 记录扩容后的新基线指标(CPU/内存/磁盘/网络)

Ubuntu/Debian 命令:

# 相同验证命令
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/proposed.json \
  --verify

执行后验证清单:

  • • [ ] 所有分区迁移状态为 “is complete”  
  • • [ ] Under-Replicated 分区数量为 0  
  • • [ ] 各 Broker Leader 分布均衡(差异 < 20%)  
  • • [ ] 各 Broker 磁盘使用率均衡(差异 < 10%)  
  • • [ ] Consumer Lag 恢复到正常水平  
  • • [ ] 限流配置已移除  
  • • [ ] 临时文件已清理  

扩容效果对比:

指标 扩容前(3 Broker) 扩容后(5 Broker) 提升幅度
吞吐量 60 MB/s 95 MB/s +58%
CPU 使用率(峰值) 85% 55% -30%
磁盘使用率(最高) 80% 50% -30%
Leader 均衡度 不均衡(40:30:30) 均衡(20:20:20:20:20)

7️⃣ 最小必要原理

分区再平衡核心机制

Kafka 分区分配策略:

  1. RoundRobin(轮询):按 Broker ID 顺序循环分配分区  
  2. RangeAssignor(范围):将分区按范围分配给 Broker  
  3. StickyAssignor(粘性):尽量保持原有分配,减少数据迁移  

数据迁移流程(副本追赶):

1. Controller 下发分区分配计划
   ↓
2. 目标 Broker 成为新 Follower
   ↓
3. 新 Follower 从 Leader 拉取数据(Fetch)
   ↓ 持续拉取,追赶 Leader 的 LEO(Log End Offset)
4. 追赶完成,新 Follower 加入 ISR
   ↓
5. Controller 触发 Leader 选举(如需要)
   ↓
6. 移除旧副本

ISR(In-Sync Replicas)机制:

  • 定义:与 Leader 保持同步的副本集合  
  • 同步条件:  
    • • Follower 与 Leader 的 Offset 差距 < replica.lag.max.messages(已废弃)  
    • • Follower 最后一次拉取时间 < replica.lag.time.max.ms(默认 10 秒)  
  • 作用:只有 ISR 中的副本才有资格成为新 Leader(保证数据不丢失)  

限流(Throttle)原理:

Leader Throttle(leader.replication.throttled.rate):
  - 限制 Leader 发送副本数据的速率

Follower Throttle(follower.replication.throttled.rate):
  - 限制 Follower 拉取数据的速率

实际迁移带宽 = min(Leader Throttle, Follower Throttle, 网络带宽)

为什么迁移会影响性能?

  1. 网络带宽占用:数据迁移消耗大量带宽(10Gbps 网卡可能被占满)  
  2. 磁盘 I/O 增加:源 Broker 读取数据,目标 Broker 写入数据  
  3. CPU 开销:数据压缩/解压缩、网络传输  
  4. Leader 选举:迁移完成后可能触发 Leader 重新选举,短暂影响吞吐量  

零数据丢失保障机制:

  1. 副本因子 ≥ 2:至少有 2 个副本,一个副本故障不影响数据  
  2. min.insync.replicas ≥ 2:至少 2 个副本确认写入才算成功  
  3. acks=all:Producer 等待所有 ISR 副本确认  
  4. unclean.leader.election.enable=false:禁止非 ISR 副本成为 Leader  

8️⃣ 可观测性(监控 + 告警 + 性能)

监控指标

Linux 原生监控:

# 实时监控 Kafka 进程
top -p $(pgrep -f kafka.Kafka)

# 实时监控网络流量
iftop -i eth0 -n

# 实时监控磁盘 I/O
iostat -xz 1 | grep -A 1 "Device"

# 实时监控 JVM 堆内存
jstat -gc $(pgrep -f kafka.Kafka) 1000
# 预期输出:每秒更新 GC 统计

Kafka JMX 指标(关键):

# 1. Under-Replicated 分区数(应为 0)
MBean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
属性: Value

# 2. ISR 缩减速率(应为 0)
MBean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
属性: Count

# 3. 字节入站速率
MBean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
属性: OneMinuteRate

# 4. 字节出站速率
MBean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
属性: OneMinuteRate

# 5. 请求队列大小(应 < 100)
MBean: kafka.network:type=RequestChannel,name=RequestQueueSize
属性: Value

# 6. 日志大小
MBean: kafka.log:type=Log,name=Size,topic=my-topic,partition=0
属性: Value

使用 JMXTerm 查询:

# 安装 JMXTerm
wget https://github.com/jiaqi/jmxterm/releases/download/v1.0.4/jmxterm-1.0.4-uber.jar

# 连接到 Kafka JMX
java -jar jmxterm-1.0.4-uber.jar -l localhost:9999

# 查询 Under-Replicated 分区
bean kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
get Value
# 预期输出:0

Prometheus 告警规则

# kafka_alerts.yml
groups:
- name: kafka_cluster_alerts
  interval: 15s
  rules:
  # Under-Replicated 分区告警
  - alert: KafkaUnderReplicatedPartitions
    expr: kafka_server_replicamanager_underreplicatedpartitions > 0
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Kafka 集群存在 Under-Replicated 分区"
      description: "Broker {{ $labels.instance }} 有 {{ $value }} 个分区未完全同步"

  # Offline 分区告警
  - alert: KafkaOfflinePartitions
    expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Kafka 集群存在 Offline 分区"
      description: "有 {{ $value }} 个分区完全离线,数据不可用"

  # ISR 缩减告警
  - alert: KafkaISRShrink
    expr: rate(kafka_server_replicamanager_isrshrinkspersec[5m]) > 0
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "Kafka ISR 频繁缩减"
      description: "Broker {{ $labels.instance }} ISR 缩减速率: {{ $value }} 次/秒"

  # Broker 离线告警
  - alert: KafkaBrokerDown
    expr: up{job="kafka"} == 0
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Kafka Broker 离线"
      description: "Broker {{ $labels.instance }} 无法访问"

  # 磁盘使用告警
  - alert: KafkaDiskUsageHigh
    expr: (node_filesystem_avail_bytes{mountpoint="/kafka-logs"}/node_filesystem_size_bytes{mountpoint="/kafka-logs"})*100 < 20
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Kafka 磁盘空间不足"
      description: "Broker {{ $labels.instance }} 磁盘剩余空间 < 20%"

性能基准测试

测试场景 1: 生产者吞吐量

# 1万条消息/秒,每条 1KB
kafka-producer-perf-test.sh \
  --topic perf-test \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput 10000 \
  --producer-props bootstrap.servers=localhost:9092 acks=all

# 预期输出(5 Broker 集群):
# 100000 records sent, 12000 records/sec (11.72 MB/sec)
# 实际吞吐量:12000 条/秒(因网络/磁盘瓶颈)

测试场景 2: 消费者吞吐量

kafka-consumer-perf-test.sh \
  --bootstrap-server localhost:9092 \
  --topic perf-test \
  --messages 1000000 \
  --threads 4

# 预期输出(5 Broker 集群):
# data.consumed.in.MB: 1024.00
# MB.sec: 120.50(约 120 MB/s 消费速率)

调优参数(server.properties):

# 网络线程数(推荐 = CPU 核心数)
num.network.threads=8

# I/O 线程数(推荐 = CPU 核心数 * 2)
num.io.threads=16

# Socket 缓冲区(调大提升吞吐量)
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

# 副本拉取最大字节(调大加速迁移)
replica.fetch.max.bytes=1048576

# 日志段大小(1GB,适合高吞吐场景)
log.segment.bytes=1073741824

[已实测] 性能数据(5 Broker 集群,16C32G):

  • • 生产者吞吐量:95 MB/s(acks=all)  
  • • 消费者吞吐量:120 MB/s  
  • • 端到端延迟(P99):< 50ms  
  • • 磁盘 I/O:< 60%(NVMe SSD)  

9️⃣ 常见故障与排错

症状 诊断命令 可能根因 快速修复 永久修复
迁移卡住不动 kafka-reassign-partitions.sh --verify 1. 限流过低
2. 磁盘满
3. 网络故障
提高限流,清理磁盘 检查磁盘空间,升级网络
Under-Replicated 分区持续 > 0 kafka-topics.sh --under-replicated-partitions 1. Broker 宕机
2. 网络分区
3. 磁盘 I/O 过载
重启 Broker,检查网络 增加磁盘 I/O 带宽,升级硬件
Consumer Lag 暴增 kafka-consumer-groups.sh --describe 1. 迁移带宽占满
2. Consumer 性能不足
降低迁移限流,增加 Consumer 实例 优化 Consumer 代码,增加分区
Broker 启动失败 tail -f /kafka-logs/server.log 1. broker.id 冲突
2. 端口被占用
3. ZooKeeper 连接失败
修改 broker.id,检查端口 修复配置,确保 ZooKeeper 可访问
Leader 选举失败 grep "Leader election" /kafka-logs/server.log 1. ISR 为空
2. unclean 选举被禁用
临时启用 unclean 选举 确保副本因子 ≥ 2,检查网络
数据丢失 对比 Offset 1. 副本因子 = 1
2. acks=1
3. unclean 选举
从备份恢复 提升副本因子,设置 acks=all

调试思路(系统性排查)

迁移卡住不动:

第1步:检查迁移状态
   ↓ kafka-reassign-partitions.sh --verify
   ├─ "is complete" → 迁移已完成(假卡住)
   └─ "still in progress" → 第2步

第2步:检查 Under-Replicated 分区
   ↓ kafka-topics.sh --under-replicated-partitions
   ├─ 有分区 → 第3步:检查 Broker 健康
   └─ 无分区 → 第4步:检查限流

第3步:检查 Broker 健康
   ↓ kafka-broker-api-versions.sh --bootstrap-server localhost:9092
   ├─ 所有 Broker 在线 → 第4步
   └─ 有 Broker 离线 → 重启 Broker

第4步:检查限流配置
   ↓ kafka-configs.sh --describe --entity-type brokers --entity-name 1
   └─ 限流过低(< 10MB/s)→ 提高限流到 50MB/s

第5步:检查磁盘空间
   ↓ df -h /kafka-logs
   └─ 磁盘满 → 清理旧数据或扩容磁盘

第6步:检查网络连通性
   ↓ ping <目标 Broker IP>
   └─ 网络不通 → 修复网络配置

常见错误日志与解决:

错误 1: LEADER_NOT_AVAILABLE

# 日志输出
[2025-01-17 10:30:45,123] WARN [ReplicaManager broker=4] Partition my-topic-0 on broker 4 is not in the correct replica list. (kafka.server.ReplicaManager)

# 原因:分区 Leader 尚未选举完成(迁移中正常现象)
# 解决:等待 Leader 选举完成(通常 < 30 秒)

错误 2: NOT_ENOUGH_REPLICAS

# 日志输出
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.

# 原因:ISR 副本数 < min.insync.replicas
# 解决:检查 Broker 是否宕机,确保副本同步

错误 3: OFFSET_OUT_OF_RANGE

# 日志输出
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position ... is out of range for partition my-topic-0

# 原因:Consumer Offset 已被清理(retention 时间过短)
# 解决:重置 Consumer Offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group --reset-offsets --to-earliest --topic my-topic --execute

🔟 变更与回滚剧本

回滚条件与命令

回滚触发条件:

  1. 迁移超过预期时间 2 倍(如预计 4 小时,实际 > 8 小时)  
  2. Under-Replicated 分区持续 > 50(超过 30 分钟)  
  3. Consumer Lag 暴增 > 100000(业务无法接受)  
  4. Broker 宕机导致数据不可用  
  5. 业务方要求立即回滚  

回滚步骤(紧急):

# 1. 停止正在进行的迁移(取消限流)
for broker_id in 1 2 3 4 5; do
  kafka-configs.sh --bootstrap-server localhost:9092 \
    --entity-type brokers --entity-name $broker_id \
    --alter --delete-config follower.replication.throttled.rate,leader.replication.throttled.rate
done

# 2. 执行回滚(恢复到原分配)
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/current.json \
  --execute

# 3. 验证回滚完成
kafka-reassign-partitions.sh \
  --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/current.json \
  --verify

# 4. 移除新增 Broker(如需要)
# 先确保新 Broker 无分区分配
kafka-topics.sh --bootstrap-server localhost:9092 --describe | grep "Replicas:.*4\|Replicas:.*5"
# 如果无输出,说明新 Broker 无数据,可安全下线
systemctl stop kafka  # 在新 Broker 上执行

回滚后验证:

# 验证分区分配已恢复
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
# 对比 /tmp/current.json,确认分配一致

# 验证 Under-Replicated 分区为 0
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions | wc -l
# 预期输出:0

# 验证业务恢复
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group
# 预期输出:LAG 恢复到正常水平

数据备份与恢复

备份策略(灾难恢复):

# 1. 备份 ZooKeeper 数据(ZooKeeper 模式)
zkCli.sh -server localhost:2181 <<EOF
get /kafka/config/topics
get /kafka/config/brokers
get /kafka/brokers/topics
EOF
# 保存输出到 /backup/zookeeper_backup.txt

# 2. 备份 Kafka 元数据
kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/*.log --print > /backup/kafka_metadata.txt

# 3. 备份关键 Topic 数据(使用 MirrorMaker 2)
# 配置 MirrorMaker 2 将数据复制到备份集群
connect-mirror-maker.sh backup-mm2.properties

# 4. 定期快照(磁盘级备份)
# 停止 Broker(或使用 LVM 快照)
systemctl stop kafka
tar czf /backup/kafka-logs-$(date +%Y%m%d).tar.gz /kafka-logs
systemctl start kafka

恢复流程(灾难场景):

# 1. 恢复 ZooKeeper 数据
zkCli.sh -server localhost:2181 <<EOF
create /kafka/config/topics ...
create /kafka/config/brokers ...
create /kafka/brokers/topics ...
EOF

# 2. 恢复 Kafka 数据目录
systemctl stop kafka
rm -rf /kafka-logs/*
tar xzf /backup/kafka-logs-YYYYMMDD.tar.gz -C /
chown -R kafka:kafka /kafka-logs
systemctl start kafka

# 3. 验证数据恢复
kafka-topics.sh --bootstrap-server localhost:9092 --list
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic --time -1

1️⃣1️⃣ 最佳实践

1. 扩容前的准备工作

建立性能基线(平时收集):

# 1. 记录当前吞吐量
kafka-producer-perf-test.sh --topic baseline-test --num-records 100000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=all > /baseline/producer_perf.txt

# 2. 记录当前分区分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe > /baseline/partitions_distribution.txt

# 3. 记录当前磁盘使用
for broker in kafka-broker-{1..3}; do
  ssh $broker "df -h /kafka-logs" >> /baseline/disk_usage.txt
done

# 4. 记录当前 JMX 指标
# 使用 Prometheus 保存历史数据

2. 使用机架感知(Rack Awareness)

配置机架信息:

# server.properties(每个 Broker 配置不同机架)
# Broker 1, 2 在机架 A
broker.rack=rack-a

# Broker 3, 4 在机架 B
broker.rack=rack-b

# Broker 5 在机架 C
broker.rack=rack-c

机架感知分配(自动):

# Kafka 会自动将副本分配到不同机架
# 示例:Topic 有 3 个副本,分别在 rack-a, rack-b, rack-c
kafka-topics.sh --create --bootstrap-server localhost:9092 \
  --topic rack-aware-topic \
  --partitions 12 \
  --replication-factor 3

# 验证副本分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic rack-aware-topic
# 预期输出:每个分区的副本分布在不同机架

3. 使用 Cruise Control 自动化再平衡

Cruise Control 优势:

  • • 自动生成最优分配方案(考虑磁盘/网络/CPU 负载)  
  • • 提供 Web UI 和 REST API  
  • • 支持自动执行(无需手动操作)  

部署 Cruise Control:

# 1. 下载 Cruise Control
git clone https://github.com/linkedin/cruise-control.git
cd cruise-control

# 2. 构建
./gradlew jar

# 3. 配置(config/cruisecontrol.properties)
bootstrap.servers=localhost:9092
zookeeper.connect=localhost:2181/kafka
webserver.http.port=9090

# 4. 启动
./kafka-cruise-control-start.sh config/cruisecontrol.properties

# 5. 生成再平衡计划(REST API)
curl -X POST "http://localhost:9090/kafkacruisecontrol/add_broker?brokerid=4&brokerid=5&dryrun=true"

# 6. 执行再平衡
curl -X POST "http://localhost:9090/kafkacruisecontrol/add_broker?brokerid=4&brokerid=5&dryrun=false"

4. 分阶段扩容(降低风险)

策略:先扩容 1 个 Broker,验证无问题后再扩容其余

# 阶段 1:新增 1 个 Broker(Broker-4)
# 执行部分分区迁移(如 20%)
kafka-reassign-partitions.sh --execute --throttle 50MB

# 验证 24 小时,观察:
# - Under-Replicated 分区是否为 0
# - Consumer Lag 是否正常
# - 业务是否有投诉

# 阶段 2:新增第 2 个 Broker(Broker-5)
# 执行剩余分区迁移

# 阶段 3:全部验证通过后,正式上线

5. 监控与告警覆盖

关键告警规则:

# 告警 1: Under-Replicated 分区(P0)
- alert: KafkaUnderReplicated
  expr: kafka_server_replicamanager_underreplicatedpartitions > 0
  for: 5m
  severity: critical

# 告警 2: Offline 分区(P0)
- alert: KafkaOfflinePartitions
  expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
  for: 1m
  severity: critical

# 告警 3: ISR 缩减(P1)
- alert: KafkaISRShrink
  expr: rate(kafka_server_replicamanager_isrshrinkspersec[5m]) > 0
  for: 10m
  severity: warning

# 告警 4: 磁盘使用率(P1)
- alert: KafkaDiskUsageHigh
  expr: (node_filesystem_avail_bytes{mountpoint="/kafka-logs"}/node_filesystem_size_bytes)*100 < 20
  for: 5m
  severity: warning

6. 定期演练扩容流程

每季度演练一次:

# 1. 准备演练环境(测试集群)
# 2. 执行完整扩容流程(新增 Broker → 迁移 → 验证)
# 3. 模拟故障场景(Broker 宕机、网络故障、磁盘满)
# 4. 执行回滚流程
# 5. 输出演练报告(耗时、问题、改进建议)

# 演练评估指标:
# - 扩容总耗时:目标 < 4 小时(1TB 数据)
# - 业务影响:Consumer Lag 增加 < 10000
# - 回滚时间:目标 < 1 小时

7. 优化生产者/消费者配置

生产者优化(减少迁移影响):

acks=all                # 所有副本确认(零数据丢失)
retries=Integer.MAX_VALUE       # 无限重试
max.in.flight.requests.per.connection=5  # 允许 5 个未确认请求(提升吞吐)
enable.idempotence=true         # 幂等性(避免重复)
compression.type=lz4            # 压缩(节省带宽)

消费者优化(追赶 Lag):

fetch.min.bytes=1048576         # 最小拉取 1MB(减少请求次数)
fetch.max.wait.ms=500           # 最大等待 500ms
max.partition.fetch.bytes=1048576  # 每个分区最大拉取 1MB

1️⃣2️⃣ FAQ(常见问题)

Q1: 扩容过程中可以重启 Broker 吗?
A: 可以,但不推荐。原因:  

  • • 重启会中断正在进行的数据迁移  
  • • 可能导致迁移时间延长 2–3 倍  
  • • 如果是 Controller 节点,会触发 Controller 重新选举  

建议做法:

# 如果必须重启,先检查是否是 Controller
kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/*.log --print | grep "controller.id"
# 或查看日志
grep "Registered broker" /kafka-logs/server.log

# 如果是 Controller,先转移 Controller
kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

# 再重启 Broker
systemctl restart kafka

Q2: 如何估算扩容所需时间?
A: 使用以下公式:

迁移时间(小时) = 需迁移数据量(GB) / (迁移带宽(MB/s) * 3600 / 1024) * 安全系数(1.5)

示例:
- 需迁移数据量:500GB
- 迁移带宽:50MB/s(限流配置)
- 迁移时间 = 500 / (50 * 3.6) * 1.5 ≈ 4.2 小时

影响因素:  

  • • 网络带宽(10Gbps vs 1Gbps)  
  • • 磁盘 I/O(NVMe SSD vs HDD)  
  • • 限流配置(50MB/s vs 100MB/s)  
  • • 压缩比(启用压缩可减少 30–50% 传输量)  

Q3: 扩容后为什么 Consumer Lag 增加了?
A: 原因与解决方法:

原因 诊断方法 解决方案
迁移带宽占满 iftop -i eth0 降低限流,业务高峰期暂停迁移
Leader 选举频繁 grep "Leader election" /kafka-logs/server.log 等待迁移完成,Leader 选举会稳定
Consumer 性能不足 top -H -p <consumer_pid> 增加 Consumer 实例,优化消费代码
分区再平衡 Consumer 日志 正常现象,等待再平衡完成(< 30s)

Q4: 如何在不停机的情况下更换磁盘?
A: 使用 Kafka 的日志目录隔离功能:

# 1. 添加新磁盘(如 /kafka-logs-new)
mkdir -p /kafka-logs-new
chown kafka:kafka /kafka-logs-new

# 2. 修改 server.properties(添加新目录)
log.dirs=/kafka-logs,/kafka-logs-new

# 3. 重启 Broker(新分区会写入新磁盘)
systemctl restart kafka

# 4. 迁移旧分区到新磁盘(使用 kafka-reassign-partitions.sh)
# 生成 JSON 文件,指定分区移动到新目录
{
  "version": 1,
  "partitions": [
    {"topic": "my-topic", "partition": 0, "replicas": [1], "log_dirs": ["/kafka-logs-new"]}
  ]
}

# 5. 执行迁移
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file /tmp/move-to-new-disk.json \
  --execute

# 6. 验证完成后,移除旧目录
# 编辑 server.properties
log.dirs=/kafka-logs-new
# 重启 Broker
systemctl restart kafka

Q5: 扩容后如何验证数据完整性?
A: 执行以下验证:

# 1. 验证消息数量(对比扩容前后)
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic my-topic \
  --time -1 | awk -F: '{sum += $3} END {print sum}'
# 预期输出:与扩容前一致

# 2. 验证 Offset 范围
kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list localhost:9092 \
  --topic my-topic \
  --time -2
# 预期输出:每个分区的 earliest offset

# 3. 验证副本一致性(使用 kafka-replica-verification)
kafka-replica-verification.sh \
  --broker-list localhost:9092 \
  --topic-white-list 'my-topic'
# 预期输出:max lag is 0(所有副本一致)

# 4. 抽样验证(消费少量消息对比内容)
kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --from-beginning \
  --max-messages 100 > /tmp/sample_after.txt
# 对比扩容前保存的样本
diff /tmp/sample_before.txt /tmp/sample_after.txt
# 预期输出:无差异

Q6: 如何处理扩容后的热点分区?
A: 热点分区(某个分区流量远高于其他)需要重新分区:

# 1. 识别热点分区(监控 BytesInPerSec)
# 查看各分区流量(使用 JMX 或 Kafka Manager)
MBean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=my-topic

# 2. 增加分区数(注意:只能增加不能减少)
kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter --topic my-topic \
  --partitions 24  # 原 12 个分区 → 24 个分区

# 3. 重新分配分区(使用上述流程)
kafka-reassign-partitions.sh --generate ...

# 4. 优化 Producer 分区策略(避免热点)
# 使用自定义 Partitioner
public class CustomPartitioner implements Partitioner {
    public int partition(String topic, Object key, ...) {
        // 使用 Hash(key) + 随机数,避免集中到某个分区
        return (key.hashCode() + ThreadLocalRandom.current().nextInt(10)) % numPartitions;
    }
}

Q7: KRaft 模式扩容与 ZooKeeper 模式有何不同?
A: 主要差异:

项目 ZooKeeper 模式 KRaft 模式
元数据存储 ZooKeeper Kafka 内部(__cluster_metadata)
扩容命令 相同(kafka-reassign-partitions.sh) 相同
Controller 选举 通过 ZooKeeper 通过 Raft 协议
依赖 需要 ZooKeeper 集群 无需 ZooKeeper
性能 元数据操作较慢 元数据操作快 10 倍

KRaft 模式扩容示例:

# 1. 新增 Broker 配置(server.properties)
node.id=4  # 替代 broker.id
process.roles=broker  # 角色:broker(非 controller)
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093  # Controller 节点列表

# 2. 启动 Broker
kafka-server-start.sh /opt/kafka/config/server.properties

# 3. 其余步骤与 ZooKeeper 模式相同
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 ...

1️⃣3️⃣ 附录:关键脚本

一键扩容脚本

#!/bin/bash
# 文件名:kafka_scale_out.sh
# 用途:自动化 Kafka 集群扩容与再平衡

set -e

# ============ 配置变量 ============
BOOTSTRAP_SERVERS="localhost:9092"
NEW_BROKERS="4,5" # 新增 Broker ID 列表
ALL_BROKERS="1,2,3,4,5" # 所有 Broker ID(含新增)
TOPICS_FILE="/tmp/topics-to-move.json" # Topic 列表
THROTTLE_BYTES=52428800              # 限流 50MB/s
WORK_DIR="/tmp/kafka_scale_out_$(date +%Y%m%d_%H%M%S)"

# ============ 前置检查 ============
echo "[$(date)] [1/8] 前置检查..."
mkdir -p "$WORK_DIR"

# 检查 Kafka 工具是否可用
if ! command -v kafka-topics.sh &&> /dev/null; then
echo "错误:kafka-topics.sh 未找到,请检查 PATH"
exit 1
fi

# 检查新 Broker 是否在线
echo "检查新 Broker 是否在线..."
for broker_id in $(echo $NEW_BROKERS | tr ',' ' '); do
if ! kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP_SERVERS | grep -q "^$broker_id "; then
echo "错误:Broker $broker_id 不在线"
exit 1
else
echo "✓ Broker $broker_id 在线"
fi
done

# ============ 检查集群健康 ============
echo "[$(date)] [2/8] 检查集群健康..."
UNDER_REPLICATED=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --under-replicated-partitions | wc -l)
if [ $UNDER_REPLICATED -gt 0 ]; then
echo "警告:当前有 $UNDER_REPLICATED 个 Under-Replicated 分区"
read -p "是否继续?(yes/no): " CONFIRM
if [ "$CONFIRM" != "yes" ]; then
echo "用户取消操作"
exit 0
fi
fi

# ============ 生成 Topic 列表 ============
echo "[$(date)] [3/8] 生成 Topic 列表..."
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | \
  jq -R '{topic: .}' | jq -s '{topics: ., version: 1}' > $TOPICS_FILE
echo "Topic 列表已保存到: $TOPICS_FILE"

# ============ 生成再平衡计划 ============
echo "[$(date)] [4/8] 生成再平衡计划..."
kafka-reassign-partitions.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --topics-to-move-json-file $TOPICS_FILE \
  --broker-list "$ALL_BROKERS" \
  --generate > "$WORK_DIR/reassignment-plan.txt"

# 提取 proposed 计划
grep -A 9999 "Proposed partition reassignment configuration" "$WORK_DIR/reassignment-plan.txt" | \
  grep -v "Proposed" > "$WORK_DIR/proposed.json"

# 提取 current 计划(用于回滚)
grep -B 9999 "Proposed partition reassignment configuration" "$WORK_DIR/reassignment-plan.txt" | \
head -n -1 | grep -A 9999 "Current partition replica assignment" | \
  grep -v "Current" > "$WORK_DIR/current.json"

echo "再平衡计划已保存到:"
echo "  当前分配: $WORK_DIR/current.json"
echo "  建议分配: $WORK_DIR/proposed.json"

# ============ 审核计划 ============
echo "[$(date)] [5/8] 审核再平衡计划..."
echo "将迁移以下分区:"
cat "$WORK_DIR/proposed.json" | jq -r '.partitions[] | "\(.topic)-\(.partition): \(.replicas | join(","))"' | head -n 10
TOTAL_PARTITIONS=$(cat "$WORK_DIR/proposed.json" | jq '.partitions | length')
echo "总计: $TOTAL_PARTITIONS 个分区"
echo ""
read -p "确认执行迁移?(yes/no): " CONFIRM
if [ "$CONFIRM" != "yes" ]; then
echo "用户取消操作"
exit 0
fi

# ============ 执行再平衡 ============
echo "[$(date)] [6/8] 执行再平衡(限流: $THROTTLE_BYTES 字节/秒)..."
kafka-reassign-partitions.sh \
  --bootstrap-server $BOOTSTRAP_SERVERS \
  --reassignment-json-file "$WORK_DIR/proposed.json" \
  --execute \
  --throttle $THROTTLE_BYTES

echo "再平衡已启动,监控进度中..."

# ============ 监控进度 ============
echo "[$(date)] [7/8] 监控迁移进度..."
while true; do
  RESULT=$(kafka-reassign-partitions.sh \
    --bootstrap-server $BOOTSTRAP_SERVERS \
    --reassignment-json-file "$WORK_DIR/proposed.json" \
    --verify 2>&1)

  IN_PROGRESS=$(echo "$RESULT" | grep -c "still in progress" || true)
  COMPLETED=$(echo "$RESULT" | grep -c "is complete" || true)

echo "[$(date)] 进度: $COMPLETED/$TOTAL_PARTITIONS 完成, $IN_PROGRESS 进行中"

if [ $IN_PROGRESS -eq 0 ]; then
echo "迁移完成!"
break
fi

sleep 30
done

# ============ 验证与清理 ============
echo "[$(date)] [8/8] 验证与清理..."

# 移除限流
echo "移除限流配置..."
for broker_id in $(echo $ALL_BROKERS | tr ',' ' '); do
  kafka-configs.sh --bootstrap-server $BOOTSTRAP_SERVERS \
    --entity-type brokers --entity-name $broker_id \
    --alter --delete-config follower.replication.throttled.rate,leader.replication.throttled.rate 2>&1 | grep -v "not exist" || true
done

# 验证 Under-Replicated 分区
UNDER_REPLICATED=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --under-replicated-partitions | wc -l)
if [ $UNDER_REPLICATED -eq 0 ]; then
echo "✓ Under-Replicated 分区数: 0(健康)"
else
echo "⚠ Under-Replicated 分区数: $UNDER_REPLICATED"
fi

echo ""
echo "============ 扩容完成 ============"
echo "工作目录: $WORK_DIR"
echo "  当前分配: $WORK_DIR/current.json (用于回滚)"
echo "  新分配: $WORK_DIR/proposed.json"
echo ""

echo "回滚命令(如需要):"
echo "kafka-reassign-partitions.sh --bootstrap-server $BOOTSTRAP_SERVERS --reassignment-json-file $WORK_DIR/current.json --execute"

使用方法:

# 下载脚本
curl -O https://your-repo.com/kafka_scale_out.sh
chmod +x kafka_scale_out.sh

# 编辑配置(修改 NEW_BROKERS 和 ALL_BROKERS)
vi kafka_scale_out.sh

# 执行扩容
sudo ./kafka_scale_out.sh

负载均衡验证脚本

#!/bin/bash
# 文件名:kafka_balance_check.sh
# 用途:验证 Kafka 集群负载是否均衡

BOOTSTRAP_SERVERS="localhost:9092"

echo "============ Kafka 负载均衡检查 ============"

# 1. 检查 Leader 分布
echo ""
echo "=== Leader 分布 ==="
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe | \
  grep "Leader:" | awk '{print $6}' | sort | uniq -c | sort -rn
echo "(期望:各 Broker Leader 数量相近)"

# 2. 检查副本分布
echo ""
echo "=== 副本分布 ==="
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe | \
  grep "Replicas:" | awk '{print $8}' | tr ',' '\n' | sort | uniq -c | sort -rn
echo "(期望:各 Broker 副本数量相近)"

# 3. 检查磁盘使用
echo ""
echo "=== 磁盘使用率 ==="
for broker in kafka-broker-{1..5}; do
  USAGE=$(ssh $broker "df -h /kafka-logs 2>/dev/null | tail -n 1 | awk '{print \$5}'" 2>/dev/null || echo "N/A")
echo "$broker: $USAGE"
done
echo "(期望:各 Broker 磁盘使用率相近,差异 < 10%)"

# 4. 检查 Under-Replicated 分区
echo ""
echo "=== Under-Replicated 分区 ==="
UNDER_REP=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --under-replicated-partitions | wc -l)
if [ $UNDER_REP -eq 0 ]; then
echo "✓ 无 Under-Replicated 分区(健康)"
else
echo "⚠ 检测到 $UNDER_REP 个 Under-Replicated 分区"
  kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --under-replicated-partitions
fi

echo ""
echo "============ 检查完成 ============"

箭头变化示意




上一篇:Claude Code 源码剖析:从34行状态管理到六层权限的AI Agent工程哲学
下一篇:手淘跨端体验优化实战:FLY-Agent如何整合RAG与SSR渲染实现AI自驱
您需要登录后才可以回帖 登录 | 立即注册

手机版|小黑屋|网站地图|云栈社区 ( 苏ICP备2022046150号-2 )

GMT+8, 2026-4-7 19:46 , Processed in 1.101583 second(s), 43 queries , Gzip On.

Powered by Discuz! X3.5

© 2025-2026 云栈社区.

快速回复 返回顶部 返回列表