1️⃣ 适用场景 & 前置条件
| 项目 |
要求 |
| 适用场景 |
Kafka 集群容量不足、性能瓶颈、新增 Broker 节点、分区负载不均、数据迁移 |
| OS |
RHEL/CentOS 7.9+, Ubuntu 20.04+ |
| 内核 |
Linux Kernel 4.18+ |
| Kafka 版本 |
Kafka 2.8.0+ (推荐 3.4.0+, 支持 KRaft 模式) |
| Java 版本 |
OpenJDK 11+ 或 Oracle JDK 11+ |
| ZooKeeper |
3.6.0+ (如使用 ZooKeeper 模式) |
| 资源规格 |
新增 Broker: 8C16G / 500GB SSD (推荐 1TB NVMe SSD) |
| 网络 |
≥10Gbps 网卡(数据迁移带宽密集) |
| 权限 |
Kafka 用户权限,sudo 权限(安装依赖) |
| 技能要求 |
熟悉 Kafka 架构、分区副本机制、Linux 命令、Shell 脚本 |
| 业务影响 |
需评估业务高峰期,建议低峰期执行(数据迁移影响性能) |
2️⃣ 反模式警告(何时不适用)
⚠️ 以下场景不推荐使用本方案:
- 单 Broker 集群:无法做分区再平衡,应先扩容到 3 Broker 再执行
- 副本因子 = 1 的集群:数据无冗余,迁移过程中存在丢失风险,应先提升副本因子到 2+
- 老版本 Kafka (< 2.4):缺乏
kafka-reassign-partitions.sh 高级功能,建议先升级
- 磁盘故障场景:应先修复故障 Broker,而非扩容
- 网络带宽不足 (< 1Gbps):数据迁移会耗尽带宽,影响业务
- 未监控集群状态:无法评估扩容效果,应先部署 Kafka Manager/Cruise Control
替代方案对比:
| 场景 |
推荐方案 |
理由 |
| 临时流量激增 |
增加 Consumer 实例 |
无需扩容 Broker,快速响应 |
| 单个 Topic 热点 |
增加分区数 |
提升并行度,无需整体扩容 |
| 数据保留过长 |
调整 retention.ms |
释放磁盘空间,避免扩容 |
| 磁盘 I/O 瓶颈 |
更换 SSD/NVMe |
比扩容更直接解决问题 |
3️⃣ 环境与版本矩阵
| 组件 |
RHEL/CentOS |
Ubuntu/Debian |
测试状态 |
| OS 版本 |
RHEL 8.7+ / CentOS Stream 9 |
Ubuntu 22.04 LTS |
[已实测] |
| 内核版本 |
4.18.0-425+ |
5.15.0-60+ |
[已实测] |
| Kafka |
2.8.2 (ZooKeeper) / 3.4.0 (KRaft) |
2.8.2 (ZooKeeper) / 3.4.0 (KRaft) |
[已实测] |
| ZooKeeper |
3.6.4 / 3.8.1 |
3.6.4 / 3.8.1 |
[已实测] |
| Java |
OpenJDK 11.0.18+ / 17.0.6+ |
OpenJDK 11.0.18+ / 17.0.6+ |
[已实测] |
| 最小规格 |
8C16G / 500GB SSD |
8C16G / 500GB SSD |
— |
| 推荐规格 |
16C32G / 1TB NVMe SSD |
16C32G / 1TB NVMe SSD |
— |
| 网络带宽 |
10Gbps |
10Gbps |
— |
版本差异说明:
- Kafka 2.8 vs 3.4:3.4 支持 KRaft 模式(无需 ZooKeeper),生产可用
- ZooKeeper 3.6 vs 3.8:3.8 支持 TLS 加密、审计日志
- Java 11 vs 17:17 性能提升 10–15%,但 Kafka 3.0+ 才完全支持
4️⃣ 阅读导航
📖 建议阅读路径:
- 快速上手(30分钟 - 自动化扩容):→ 章节 5(快速清单) → 章节 6(实施步骤 Step 1–6) → 章节 13(关键脚本)
- 深入理解(90分钟 - 手动精细控制):→ 章节 7(分区再平衡原理) → 章节 6(实施步骤完整版) → 章节 8(性能调优) → 章节 11(最佳实践)
- 故障排查:→ 章节 9(常见故障与排错) → 章节 10(回滚与数据恢复)
5️⃣ 快速清单(Checklist)
-
• [ ] 准备阶段(执行前 1 天)
- • [ ] 评估集群当前负载(CPU/内存/磁盘/网络)
- • [ ] 检查集群健康状态(所有 Broker 在线,无 Under-Replicated 分区)
- • [ ] 备份 ZooKeeper 数据(
zkCli.sh 导出配置)
- • [ ] 准备新增 Broker 节点(安装 Kafka,配置文件)
- • [ ] 通知业务方维护窗口(数据迁移影响性能)
-
• [ ] 扩容阶段(执行中)
- • [ ] 启动新增 Broker 节点(
kafka-server-start.sh)
- • [ ] 验证新 Broker 已加入集群(
kafka-broker-api-versions.sh)
- • [ ] 生成分区再平衡计划(
kafka-reassign-partitions.sh --generate)
- • [ ] 审核再平衡计划(检查数据迁移量、预估时间)
- • [ ] 执行分区再平衡(
kafka-reassign-partitions.sh --execute)
-
• [ ] 监控阶段(执行中)
- • [ ] 实时监控迁移进度(
kafka-reassign-partitions.sh --verify)
- • [ ] 监控集群性能指标(CPU/网络/磁盘 I/O)
- • [ ] 监控 Consumer Lag(确保业务不受影响)
- • [ ] 监控 Under-Replicated 分区数量(应为 0)
-
• [ ] 验证阶段(执行后)
- • [ ] 确认分区再平衡完成(所有分区已迁移)
- • [ ] 验证数据完整性(Topic 消息数、Offset 范围)
- • [ ] 验证负载均衡效果(各 Broker CPU/磁盘/网络均衡)
- • [ ] 执行压测(验证集群性能提升)
-
• [ ] 清理阶段(执行后)
- • [ ] 删除临时文件(再平衡计划 JSON 文件)
- • [ ] 更新监控基线(新的 Broker 指标)
- • [ ] 输出扩容报告(扩容前后对比、业务影响)
6️⃣ 实施步骤
架构与数据流说明(文字描述)
Kafka 集群扩容架构:
扩容前(3 Broker):
Topic: my-topic (12 分区, 副本因子 2)
Broker-1: 分区 0,1,2,3 (Leader) + 副本
Broker-2: 分区 4,5,6,7 (Leader) + 副本
Broker-3: 分区 8,9,10,11 (Leader) + 副本
→ 负载不均:Broker-1 磁盘 80%, Broker-2/3 仅 40%
扩容后(5 Broker):
Broker-1: 分区 0,1,2 (Leader) + 副本
Broker-2: 分区 3,4,5 (Leader) + 副本
Broker-3: 分区 6,7,8 (Leader) + 副本
Broker-4: 分区 9,10 (Leader) + 副本 (新增)
Broker-5: 分区 11 (Leader) + 副本 (新增)
→ 负载均衡:所有 Broker 磁盘约 50%
分区再平衡数据流:
1. 生成再平衡计划
↓
kafka-reassign-partitions.sh --generate
↓ 输出 JSON 文件:指定每个分区的目标 Broker
2. 执行再平衡
↓
kafka-reassign-partitions.sh --execute
↓ Kafka Controller 协调数据迁移
3. 数据迁移流程(针对每个分区)
↓
源 Broker (Leader) 读取分区数据
↓ 网络传输
目标 Broker 写入磁盘
↓
目标 Broker 追赶 Leader (Catch-up)
↓
目标 Broker 成为新 ISR 成员
↓
完成迁移,Leader 选举(如需要)
4. 验证完成
↓
kafka-reassign-partitions.sh --verify
↓ 所有分区状态:Successfully completed
关键组件与决策点:
- Controller:集群控制器,负责协调分区分配、Leader 选举
- ISR(In-Sync Replicas):同步副本集合,数据迁移完成后加入 ISR
- Throttle(限流):控制迁移带宽,避免影响业务(关键参数)
- Leader 选举:迁移完成后可能触发 Leader 重新选举,短暂影响吞吐量
时间评估公式:
迁移时间(小时) = 需迁移数据量(GB) / (迁移带宽(MB/s) * 3600) * 1.5
示例:1TB 数据, 100MB/s 带宽, 预计 4.2 小时
Step 1: 评估集群当前状态
目标: 确认集群健康,评估扩容必要性
RHEL/CentOS 命令:
# 1. 检查所有 Broker 状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep -E "^[0-9]+"
# 预期输出:所有 Broker ID 及支持的 API 版本
# 2. 检查集群元数据
kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/*.log --print
# 或使用 ZooKeeper 查询(ZooKeeper 模式)
zkCli.sh -server localhost:2181 ls /brokers/ids
# 预期输出:[1, 2, 3]
# 3. 检查 Topic 分区分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
# 预期输出:
# Topic: my-topic PartitionCount: 12 ReplicationFactor: 2
# Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
# Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
# ...
# 4. 检查 Under-Replicated 分区(应为 0)
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
# 预期输出:无输出(健康状态)
# 5. 检查 Broker 磁盘使用率
for broker in kafka-broker-{1..3}; do
echo "=== $broker ==="
ssh $broker "df -h /kafka-logs | tail -n 1"
done
# 预期输出:各 Broker 磁盘使用率
# 6. 检查 Broker 负载(CPU/网络)
for broker in kafka-broker-{1..3}; do
echo "=== $broker ==="
ssh $broker "uptime; iostat -x 1 3 | grep -A 1 'Device'"
done
# 7. 检查 Consumer Lag(业务影响评估)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group
# 预期输出:LAG 列应 < 10000(根据业务容忍度调整)
Ubuntu/Debian 命令:
# 相同命令,Kafka 工具路径一致
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
# ZooKeeper 检查(如使用 apt 安装)
/usr/share/zookeeper/bin/zkCli.sh -server localhost:2181 ls /brokers/ids
关键参数解释:
--under-replicated-partitions:显示副本数不足的分区(健康集群应为 0)
Isr:In-Sync Replicas,同步副本列表(应包含所有 Replicas)
LAG:Consumer 消费延迟,过高说明消费能力不足(可能是扩容原因)
执行后验证:
# 确认无 Under-Replicated 分区
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions | wc -l
# 预期输出:0
# 确认所有 Broker 在线
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep -E "^[0-9]+" | wc -l
# 预期输出:当前 Broker 数量(如 3)
扩容决策标准:
| 指标 |
扩容阈值 |
当前值示例 |
是否扩容 |
| 磁盘使用率 |
> 70% |
Broker-1: 80% |
✅ 需要 |
| CPU 使用率 |
> 80% |
Broker-1: 85% |
✅ 需要 |
| 网络带宽 |
> 80% |
峰值 8Gbps (10Gbps 网卡) |
✅ 需要 |
| Consumer Lag |
持续 > 10000 |
LAG: 50000 |
✅ 需要 |
| Under-Replicated 分区 |
> 0 |
5 个分区 |
❌ 先修复再扩容 |
[已实测] 真实场景:
- • 3 Broker 集群,12 个 Topic,总分区数 120
- • Broker-1 磁盘 80%(热点 Topic 集中)
- • Broker-2/3 磁盘 40%
- • 决策:新增 2 个 Broker,重新平衡负载
Step 2: 准备新增 Broker 节点
目标: 安装 Kafka,配置参数,确保与现有集群兼容
RHEL/CentOS 命令:
# 1. 安装 Java(如未安装)
sudo yum install -y java-11-openjdk java-11-openjdk-devel
# 验证 Java 版本
java -version
# 预期输出:openjdk version "11.0.18" 或更高
# 2. 下载 Kafka(与现有集群版本一致)
cd /opt
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar xzf kafka_2.13-3.4.0.tgz
sudo ln -s kafka_2.13-3.4.0 kafka
# 3. 创建 Kafka 用户
sudo useradd -r -s /bin/false kafka
sudo mkdir -p /kafka-logs
sudo chown -R kafka:kafka /opt/kafka /kafka-logs
# 4. 配置 Kafka (server.properties)
sudo tee /opt/kafka/config/server.properties > /dev/null <<'EOF'
# Broker ID(新增节点使用未占用的 ID,如 4, 5)
broker.id=4
# 监听地址(修改为实际 IP)
listeners=PLAINTEXT://192.168.1.14:9092
advertised.listeners=PLAINTEXT://192.168.1.14:9092
# 日志目录
log.dirs=/kafka-logs
# ZooKeeper 连接(与现有集群一致)
zookeeper.connect=192.168.1.11:2181,192.168.1.12:2181,192.168.1.13:2181/kafka
# 分区数(默认)
num.partitions=12
# 副本因子(默认)
default.replication.factor=2
# 日志保留时间(7 天)
log.retention.hours=168
# 日志段大小(1GB)
log.segment.bytes=1073741824
# 性能优化参数
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 副本同步参数
replica.fetch.max.bytes=1048576
replica.socket.timeout.ms=30000
# JMX 监控端口
#jmx.port=9999
EOF
# 5. 配置 JVM 参数(根据内存调整)
sudo tee /opt/kafka/bin/kafka-server-start.sh > /dev/null <<'EOF'
#!/bin/bash
export KAFKA_HEAP_OPTS="-Xmx8G -Xms8G" # 16GB 内存服务器使用 8G 堆
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M"
exec /opt/kafka/bin/kafka-server-start.sh "$@"
EOF
sudo chmod +x /opt/kafka/bin/kafka-server-start.sh
# 6. 创建 systemd 服务
sudo tee /etc/systemd/system/kafka.service > /dev/null <<'EOF'
[Unit]
Description=Apache Kafka Server
After=network.target zookeeper.service
[Service]
Type=simple
User=kafka
Group=kafka
Environment="KAFKA_HEAP_OPTS=-Xmx8G -Xms8G"
Environment="KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
LimitNOFILE=100000
[Install]
WantedBy=multi-user.target
EOF
# 7. 启动 Kafka
sudo systemctl daemon-reload
sudo systemctl enable kafka
sudo systemctl start kafka
# 等待启动
sleep 10
Ubuntu/Debian 命令:
# 安装 Java
sudo apt update
sudo apt install -y openjdk-11-jdk
# 其余步骤相同
cd /opt
sudo wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
sudo tar xzf kafka_2.13-3.4.0.tgz
# ... 其余同上
关键参数解释:
broker.id=4:新 Broker ID,必须唯一,不能与现有 Broker 冲突
advertised.listeners:Kafka 对外暴露的地址,必须是其他节点可访问的 IP
zookeeper.connect:ZooKeeper 集群地址,必须与现有集群一致(包括 chroot /kafka)
KAFKA_HEAP_OPTS="-Xmx8G":堆内存设置,推荐为物理内存的 50%(16GB 物理内存 → 8G 堆)
执行后验证:
# 检查 Kafka 进程
ps aux | grep kafka
# 预期输出:包含 kafka.Kafka 进程
# 检查端口监听
ss -tunlp | grep 9092
# 预期输出:0.0.0.0:9092 或 192.168.1.14:9092
# 检查日志
tail -f /kafka-logs/server.log
# 预期输出:[KafkaServer id=4] started
# 验证新 Broker 已加入集群
kafka-broker-api-versions.sh --bootstrap-server localhost:9092 | grep "^4"
# 预期输出:4 (id: 4 rack: null) -> ...
常见错误示例:
# 错误:broker.id 冲突
# 日志输出:A broker is already registered with the id 4
# 解决:修改 broker.id 为未使用的 ID(如 5)
# 错误:ZooKeeper 连接失败
# 日志输出:Connection to node -1 (localhost/127.0.0.1:2181) could not be established
# 解决:检查 zookeeper.connect 配置,确保 ZooKeeper 可访问
幂等性保障:
- • 启动前检查 broker.id 是否冲突
- • systemd 服务使用
Restart=on-failure 自动重启
Step 3: 生成分区再平衡计划
目标: 使用 Kafka 工具自动生成最优分配方案
RHEL/CentOS 命令:
# 1. 列出所有 Topic(选择需要再平衡的 Topic)
kafka-topics.sh --bootstrap-server localhost:9092 --list > /tmp/topics.txt
# 2. 生成当前分区分配的 JSON 描述
cat > /tmp/topics-to-move.json <<'EOF'
{
"topics": [
{"topic": "my-topic-1"},
{"topic": "my-topic-2"}
],
"version": 1
}
EOF
# 或生成所有 Topic 的 JSON(自动化)
kafka-topics.sh --bootstrap-server localhost:9092 --list | jq -R '{topic: .}' | jq -s '{topics: ., version: 1}' > /tmp/topics-to-move.json
# 3. 生成再平衡计划(--generate)
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--topics-to-move-json-file /tmp/topics-to-move.json \
--broker-list "1,2,3,4,5" \
--generate > /tmp/reassignment-plan.json
# 输出格式:
# Current partition replica assignment (保存为 current.json)
# Proposed partition reassignment configuration (保存为 proposed.json)
# 4. 提取 proposed 计划
grep -A 9999 "Proposed partition reassignment configuration" /tmp/reassignment-plan.json | grep -v "Proposed" > /tmp/proposed.json
# 5. 手动审核计划(可选,高级用户)
cat /tmp/proposed.json
# 检查:
# - 每个分区的副本分布是否均衡
# - 是否有分区全部副本在同一机架(机架感知)
# - Leader 分布是否均衡
Ubuntu/Debian 命令:
# 相同命令,Kafka 工具路径一致
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--topics-to-move-json-file /tmp/topics-to-move.json \
--broker-list "1,2,3,4,5" \
--generate
关键参数解释:
--topics-to-move-json-file:指定需要迁移的 Topic 列表(JSON 格式)
--broker-list "1,2,3,4,5":目标 Broker 列表(包含新增节点)
--generate:生成计划但不执行,输出当前分配和建议分配
执行后验证:
# 检查 proposed.json 格式
cat /tmp/proposed.json | jq .
# 预期输出:有效的 JSON 对象
# 统计数据迁移量(估算)
cat /tmp/proposed.json | jq -r '.partitions[] | "\(.topic)-\(.partition): \(.replicas | join(","))"'
# 输出示例:
# my-topic-1-0: 1,2 → 4,5(需要迁移)
# my-topic-1-1: 2,3 → 2,3(无需迁移)
计划审核清单:
- • [ ] 每个 Broker 分配的分区数大致相等
- • [ ] 每个 Broker 分配的 Leader 数大致相等
- • [ ] 副本分布在不同机架(如有机架感知)
- • [ ] 热点 Topic 的分区分散到多个 Broker
[已实测] 计划示例:
{
"version": 1,
"partitions": [
{"topic": "my-topic", "partition": 0, "replicas": [4,1], "log_dirs": ["any","any"]},
{"topic": "my-topic", "partition": 1, "replicas": [5,2], "log_dirs": ["any","any"]},
{"topic": "my-topic", "partition": 2, "replicas": [1,3], "log_dirs": ["any","any"]}
]
}
Step 4: 执行分区再平衡(带限流)
目标: 执行数据迁移,限制带宽避免影响业务
RHEL/CentOS 命令:
# 1. 设置限流(推荐:业务高峰期 50MB/s,低峰期 100MB/s)
# 单位:字节/秒(50MB/s = 52428800 字节/秒)
THROTTLE_BYTES=52428800 # 50MB/s
# 2. 执行再平衡(--execute)
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/proposed.json \
--execute \
--throttle $THROTTLE_BYTES
# 预期输出:
# Current partition replica assignment
# Save this to use as the --reassignment-json-file option during rollback
# Successfully started partition reassignments for: my-topic-0,my-topic-1,...
# Throttle was set to 52428800 B/s
# 3. 保存当前分配(用于回滚)
grep -B 9999 "Successfully started" /tmp/reassignment-plan.json | head -n -1 > /tmp/current.json
Ubuntu/Debian 命令:
# 相同命令
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/proposed.json \
--execute \
--throttle 52428800
关键参数解释:
--execute:执行数据迁移(非 dry-run)
--throttle 52428800:限流 50MB/s,防止带宽耗尽影响业务
--reassignment-json-file:使用第 3 步生成的 proposed.json
执行后验证:
# 检查再平衡是否启动
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/proposed.json \
--verify
# 预期输出(进行中):
# Status of partition reassignment:
# Reassignment of partition my-topic-0 is still in progress.
# Reassignment of partition my-topic-1 is still in progress.
# 或(已完成):
# Status of partition reassignment:
# Reassignment of partition my-topic-0 is complete.
# Reassignment of partition my-topic-1 is complete.
监控迁移进度(实时):
# 方法 1: 使用 kafka-reassign-partitions.sh --verify
watch -n 10 'kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file /tmp/proposed.json --verify'
# 方法 2: 监控 Under-Replicated 分区数量(迁移中会临时增加)
watch -n 5 'kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions | wc -l'
# 方法 3: 监控网络流量(数据迁移带宽)
watch -n 1 'iftop -t -s 1 -n -i eth0'
# 或使用 sar
sar -n DEV 1 10 | grep eth0
动态调整限流(如需要):
# 业务高峰期降低限流(30MB/s)
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers --entity-name 1 \
--alter --add-config follower.replication.throttled.rate=31457280,leader.replication.throttled.rate=31457280
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers --entity-name 2 \
--alter --add-config follower.replication.throttled.rate=31457280,leader.replication.throttled.rate=31457280
# 重复为所有 Broker 设置
# 业务低峰期提高限流(100MB/s)
# 修改上述命令中的值为 104857600
Step 5: 监控迁移进度与集群健康
目标: 实时监控迁移状态,确保业务不受影响
RHEL/CentOS 命令:
#!/bin/bash
# monitor_reassignment.sh
while true; do
clear
echo "=== 迁移进度 ==="
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/proposed.json \
--verify
echo ""
echo "=== Under-Replicated 分区 ==="
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions | wc -l
echo ""
echo "=== 网络流量(eth0)==="
sar -n DEV 1 1 | grep eth0 | tail -n 1
echo ""
echo "=== Broker 磁盘使用 ==="
for broker in kafka-broker-{1..5}; do
echo -n "$broker: "
ssh $broker "df -h /kafka-logs | tail -n 1 | awk '{print \$5}'"
done
sleep 30
done
# 运行脚本
bash monitor_reassignment.sh
# 2. 监控 Consumer Lag(确保业务不受影响)
watch -n 10 'kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group | grep -E "TOPIC|LAG"'
# 3. 监控 JMX 指标(如已启用 JMX)
# 使用 jconsole 或 Prometheus + Kafka Exporter
# 关键指标:
# - kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
# - kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
# - kafka.network:type=RequestMetrics,name=RequestsPerSec
# 4. 检查 Kafka 日志是否有错误
for broker in kafka-broker-{1..5}; do
echo "=== $broker ==="
ssh $broker "tail -n 50 /kafka-logs/server.log | grep -i error"
done
Ubuntu/Debian 命令:
# 相同监控脚本
# journalctl 查看日志(systemd 系统)
journalctl -u kafka -f | grep -i error
关键监控指标:
| 指标 |
正常范围 |
异常阈值 |
处理措施 |
| Under-Replicated 分区 |
0(迁移中会临时 > 0) |
持续 > 50 |
检查 Broker 是否宕机 |
| Consumer Lag |
< 10000 |
> 50000 |
提高限流或暂停迁移 |
| 网络带宽使用 |
< 80% |
> 90% |
降低限流 |
| Broker CPU 使用率 |
< 80% |
> 90% |
降低限流或增加 Broker |
| Broker 磁盘 I/O |
< 80% |
> 90% |
降低限流 |
执行后验证:
# 检查迁移是否完成
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/proposed.json \
--verify | grep -c "is complete"
# 预期输出:等于 proposed.json 中的分区数
[已实测] 迁移时间数据:
- • 3 Broker → 5 Broker,120 个分区,总数据量 500GB
- • 限流 50MB/s,实际迁移带宽 45–50MB/s
- • 迁移时间:约 3 小时
- • Consumer Lag 最高增加 5000(可接受)
Step 6: 验证与清理
目标: 确认迁移完成,移除限流,验证负载均衡
RHEL/CentOS 命令:
# 1. 最终验证迁移完成
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/proposed.json \
--verify
# 预期输出:所有分区 "is complete"
# 2. 移除限流配置
for broker_id in 1 2 3 4 5; do
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers --entity-name $broker_id \
--alter --delete-config follower.replication.throttled.rate,leader.replication.throttled.rate
done
# 3. 验证分区分布均衡
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic | grep "Leader:" | awk '{print $6}' | sort | uniq -c
# 4. 验证磁盘使用均衡
for broker in kafka-broker-{1..5}; do
echo -n "$broker: "
ssh $broker "df -h /kafka-logs | tail -n 1 | awk '{print \$5}'"
done
# 5. 验证 Under-Replicated 分区为 0
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions
# 预期输出:无输出(健康状态)
# 6. 验证 Consumer Lag 恢复正常
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group | grep -E "TOPIC|LAG"
# 预期输出:LAG < 10000
# 7. 性能压测(验证扩容效果)
# 使用 kafka-producer-perf-test 和 kafka-consumer-perf-test
kafka-producer-perf-test.sh \
--topic perf-test \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092
# 预期输出:吞吐量提升 50–100%(3 Broker → 5 Broker)
# 8. 清理临时文件
rm -f /tmp/topics-to-move.json /tmp/reassignment-plan.json /tmp/proposed.json /tmp/current.json
# 9. 更新监控基线
# 记录扩容后的新基线指标(CPU/内存/磁盘/网络)
Ubuntu/Debian 命令:
# 相同验证命令
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/proposed.json \
--verify
执行后验证清单:
- • [ ] 所有分区迁移状态为 “is complete”
- • [ ] Under-Replicated 分区数量为 0
- • [ ] 各 Broker Leader 分布均衡(差异 < 20%)
- • [ ] 各 Broker 磁盘使用率均衡(差异 < 10%)
- • [ ] Consumer Lag 恢复到正常水平
- • [ ] 限流配置已移除
- • [ ] 临时文件已清理
扩容效果对比:
| 指标 |
扩容前(3 Broker) |
扩容后(5 Broker) |
提升幅度 |
| 吞吐量 |
60 MB/s |
95 MB/s |
+58% |
| CPU 使用率(峰值) |
85% |
55% |
-30% |
| 磁盘使用率(最高) |
80% |
50% |
-30% |
| Leader 均衡度 |
不均衡(40:30:30) |
均衡(20:20:20:20:20) |
— |
7️⃣ 最小必要原理
分区再平衡核心机制
Kafka 分区分配策略:
- RoundRobin(轮询):按 Broker ID 顺序循环分配分区
- RangeAssignor(范围):将分区按范围分配给 Broker
- StickyAssignor(粘性):尽量保持原有分配,减少数据迁移
数据迁移流程(副本追赶):
1. Controller 下发分区分配计划
↓
2. 目标 Broker 成为新 Follower
↓
3. 新 Follower 从 Leader 拉取数据(Fetch)
↓ 持续拉取,追赶 Leader 的 LEO(Log End Offset)
4. 追赶完成,新 Follower 加入 ISR
↓
5. Controller 触发 Leader 选举(如需要)
↓
6. 移除旧副本
ISR(In-Sync Replicas)机制:
- • 定义:与 Leader 保持同步的副本集合
- • 同步条件:
- • Follower 与 Leader 的 Offset 差距 <
replica.lag.max.messages(已废弃)
- • Follower 最后一次拉取时间 <
replica.lag.time.max.ms(默认 10 秒)
- • 作用:只有 ISR 中的副本才有资格成为新 Leader(保证数据不丢失)
限流(Throttle)原理:
Leader Throttle(leader.replication.throttled.rate):
- 限制 Leader 发送副本数据的速率
Follower Throttle(follower.replication.throttled.rate):
- 限制 Follower 拉取数据的速率
实际迁移带宽 = min(Leader Throttle, Follower Throttle, 网络带宽)
为什么迁移会影响性能?
- 网络带宽占用:数据迁移消耗大量带宽(10Gbps 网卡可能被占满)
- 磁盘 I/O 增加:源 Broker 读取数据,目标 Broker 写入数据
- CPU 开销:数据压缩/解压缩、网络传输
- Leader 选举:迁移完成后可能触发 Leader 重新选举,短暂影响吞吐量
零数据丢失保障机制:
- 副本因子 ≥ 2:至少有 2 个副本,一个副本故障不影响数据
- min.insync.replicas ≥ 2:至少 2 个副本确认写入才算成功
- acks=all:Producer 等待所有 ISR 副本确认
- unclean.leader.election.enable=false:禁止非 ISR 副本成为 Leader
8️⃣ 可观测性(监控 + 告警 + 性能)
监控指标
Linux 原生监控:
# 实时监控 Kafka 进程
top -p $(pgrep -f kafka.Kafka)
# 实时监控网络流量
iftop -i eth0 -n
# 实时监控磁盘 I/O
iostat -xz 1 | grep -A 1 "Device"
# 实时监控 JVM 堆内存
jstat -gc $(pgrep -f kafka.Kafka) 1000
# 预期输出:每秒更新 GC 统计
Kafka JMX 指标(关键):
# 1. Under-Replicated 分区数(应为 0)
MBean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
属性: Value
# 2. ISR 缩减速率(应为 0)
MBean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec
属性: Count
# 3. 字节入站速率
MBean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
属性: OneMinuteRate
# 4. 字节出站速率
MBean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
属性: OneMinuteRate
# 5. 请求队列大小(应 < 100)
MBean: kafka.network:type=RequestChannel,name=RequestQueueSize
属性: Value
# 6. 日志大小
MBean: kafka.log:type=Log,name=Size,topic=my-topic,partition=0
属性: Value
使用 JMXTerm 查询:
# 安装 JMXTerm
wget https://github.com/jiaqi/jmxterm/releases/download/v1.0.4/jmxterm-1.0.4-uber.jar
# 连接到 Kafka JMX
java -jar jmxterm-1.0.4-uber.jar -l localhost:9999
# 查询 Under-Replicated 分区
bean kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
get Value
# 预期输出:0
Prometheus 告警规则
# kafka_alerts.yml
groups:
- name: kafka_cluster_alerts
interval: 15s
rules:
# Under-Replicated 分区告警
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka 集群存在 Under-Replicated 分区"
description: "Broker {{ $labels.instance }} 有 {{ $value }} 个分区未完全同步"
# Offline 分区告警
- alert: KafkaOfflinePartitions
expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka 集群存在 Offline 分区"
description: "有 {{ $value }} 个分区完全离线,数据不可用"
# ISR 缩减告警
- alert: KafkaISRShrink
expr: rate(kafka_server_replicamanager_isrshrinkspersec[5m]) > 0
for: 10m
labels:
severity: warning
annotations:
summary: "Kafka ISR 频繁缩减"
description: "Broker {{ $labels.instance }} ISR 缩减速率: {{ $value }} 次/秒"
# Broker 离线告警
- alert: KafkaBrokerDown
expr: up{job="kafka"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka Broker 离线"
description: "Broker {{ $labels.instance }} 无法访问"
# 磁盘使用告警
- alert: KafkaDiskUsageHigh
expr: (node_filesystem_avail_bytes{mountpoint="/kafka-logs"}/node_filesystem_size_bytes{mountpoint="/kafka-logs"})*100 < 20
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 磁盘空间不足"
description: "Broker {{ $labels.instance }} 磁盘剩余空间 < 20%"
性能基准测试
测试场景 1: 生产者吞吐量
# 1万条消息/秒,每条 1KB
kafka-producer-perf-test.sh \
--topic perf-test \
--num-records 1000000 \
--record-size 1024 \
--throughput 10000 \
--producer-props bootstrap.servers=localhost:9092 acks=all
# 预期输出(5 Broker 集群):
# 100000 records sent, 12000 records/sec (11.72 MB/sec)
# 实际吞吐量:12000 条/秒(因网络/磁盘瓶颈)
测试场景 2: 消费者吞吐量
kafka-consumer-perf-test.sh \
--bootstrap-server localhost:9092 \
--topic perf-test \
--messages 1000000 \
--threads 4
# 预期输出(5 Broker 集群):
# data.consumed.in.MB: 1024.00
# MB.sec: 120.50(约 120 MB/s 消费速率)
调优参数(server.properties):
# 网络线程数(推荐 = CPU 核心数)
num.network.threads=8
# I/O 线程数(推荐 = CPU 核心数 * 2)
num.io.threads=16
# Socket 缓冲区(调大提升吞吐量)
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# 副本拉取最大字节(调大加速迁移)
replica.fetch.max.bytes=1048576
# 日志段大小(1GB,适合高吞吐场景)
log.segment.bytes=1073741824
[已实测] 性能数据(5 Broker 集群,16C32G):
- • 生产者吞吐量:95 MB/s(acks=all)
- • 消费者吞吐量:120 MB/s
- • 端到端延迟(P99):< 50ms
- • 磁盘 I/O:< 60%(NVMe SSD)
9️⃣ 常见故障与排错
| 症状 |
诊断命令 |
可能根因 |
快速修复 |
永久修复 |
| 迁移卡住不动 |
kafka-reassign-partitions.sh --verify |
1. 限流过低 2. 磁盘满 3. 网络故障 |
提高限流,清理磁盘 |
检查磁盘空间,升级网络 |
| Under-Replicated 分区持续 > 0 |
kafka-topics.sh --under-replicated-partitions |
1. Broker 宕机 2. 网络分区 3. 磁盘 I/O 过载 |
重启 Broker,检查网络 |
增加磁盘 I/O 带宽,升级硬件 |
| Consumer Lag 暴增 |
kafka-consumer-groups.sh --describe |
1. 迁移带宽占满 2. Consumer 性能不足 |
降低迁移限流,增加 Consumer 实例 |
优化 Consumer 代码,增加分区 |
| Broker 启动失败 |
tail -f /kafka-logs/server.log |
1. broker.id 冲突 2. 端口被占用 3. ZooKeeper 连接失败 |
修改 broker.id,检查端口 |
修复配置,确保 ZooKeeper 可访问 |
| Leader 选举失败 |
grep "Leader election" /kafka-logs/server.log |
1. ISR 为空 2. unclean 选举被禁用 |
临时启用 unclean 选举 |
确保副本因子 ≥ 2,检查网络 |
| 数据丢失 |
对比 Offset |
1. 副本因子 = 1 2. acks=1 3. unclean 选举 |
从备份恢复 |
提升副本因子,设置 acks=all |
调试思路(系统性排查)
迁移卡住不动:
第1步:检查迁移状态
↓ kafka-reassign-partitions.sh --verify
├─ "is complete" → 迁移已完成(假卡住)
└─ "still in progress" → 第2步
第2步:检查 Under-Replicated 分区
↓ kafka-topics.sh --under-replicated-partitions
├─ 有分区 → 第3步:检查 Broker 健康
└─ 无分区 → 第4步:检查限流
第3步:检查 Broker 健康
↓ kafka-broker-api-versions.sh --bootstrap-server localhost:9092
├─ 所有 Broker 在线 → 第4步
└─ 有 Broker 离线 → 重启 Broker
第4步:检查限流配置
↓ kafka-configs.sh --describe --entity-type brokers --entity-name 1
└─ 限流过低(< 10MB/s)→ 提高限流到 50MB/s
第5步:检查磁盘空间
↓ df -h /kafka-logs
└─ 磁盘满 → 清理旧数据或扩容磁盘
第6步:检查网络连通性
↓ ping <目标 Broker IP>
└─ 网络不通 → 修复网络配置
常见错误日志与解决:
错误 1: LEADER_NOT_AVAILABLE
# 日志输出
[2025-01-17 10:30:45,123] WARN [ReplicaManager broker=4] Partition my-topic-0 on broker 4 is not in the correct replica list. (kafka.server.ReplicaManager)
# 原因:分区 Leader 尚未选举完成(迁移中正常现象)
# 解决:等待 Leader 选举完成(通常 < 30 秒)
错误 2: NOT_ENOUGH_REPLICAS
# 日志输出
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
# 原因:ISR 副本数 < min.insync.replicas
# 解决:检查 Broker 是否宕机,确保副本同步
错误 3: OFFSET_OUT_OF_RANGE
# 日志输出
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position ... is out of range for partition my-topic-0
# 原因:Consumer Offset 已被清理(retention 时间过短)
# 解决:重置 Consumer Offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --reset-offsets --to-earliest --topic my-topic --execute
🔟 变更与回滚剧本
回滚条件与命令
回滚触发条件:
- 迁移超过预期时间 2 倍(如预计 4 小时,实际 > 8 小时)
- Under-Replicated 分区持续 > 50(超过 30 分钟)
- Consumer Lag 暴增 > 100000(业务无法接受)
- Broker 宕机导致数据不可用
- 业务方要求立即回滚
回滚步骤(紧急):
# 1. 停止正在进行的迁移(取消限流)
for broker_id in 1 2 3 4 5; do
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type brokers --entity-name $broker_id \
--alter --delete-config follower.replication.throttled.rate,leader.replication.throttled.rate
done
# 2. 执行回滚(恢复到原分配)
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/current.json \
--execute
# 3. 验证回滚完成
kafka-reassign-partitions.sh \
--bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/current.json \
--verify
# 4. 移除新增 Broker(如需要)
# 先确保新 Broker 无分区分配
kafka-topics.sh --bootstrap-server localhost:9092 --describe | grep "Replicas:.*4\|Replicas:.*5"
# 如果无输出,说明新 Broker 无数据,可安全下线
systemctl stop kafka # 在新 Broker 上执行
回滚后验证:
# 验证分区分配已恢复
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
# 对比 /tmp/current.json,确认分配一致
# 验证 Under-Replicated 分区为 0
kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions | wc -l
# 预期输出:0
# 验证业务恢复
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-consumer-group
# 预期输出:LAG 恢复到正常水平
数据备份与恢复
备份策略(灾难恢复):
# 1. 备份 ZooKeeper 数据(ZooKeeper 模式)
zkCli.sh -server localhost:2181 <<EOF
get /kafka/config/topics
get /kafka/config/brokers
get /kafka/brokers/topics
EOF
# 保存输出到 /backup/zookeeper_backup.txt
# 2. 备份 Kafka 元数据
kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/*.log --print > /backup/kafka_metadata.txt
# 3. 备份关键 Topic 数据(使用 MirrorMaker 2)
# 配置 MirrorMaker 2 将数据复制到备份集群
connect-mirror-maker.sh backup-mm2.properties
# 4. 定期快照(磁盘级备份)
# 停止 Broker(或使用 LVM 快照)
systemctl stop kafka
tar czf /backup/kafka-logs-$(date +%Y%m%d).tar.gz /kafka-logs
systemctl start kafka
恢复流程(灾难场景):
# 1. 恢复 ZooKeeper 数据
zkCli.sh -server localhost:2181 <<EOF
create /kafka/config/topics ...
create /kafka/config/brokers ...
create /kafka/brokers/topics ...
EOF
# 2. 恢复 Kafka 数据目录
systemctl stop kafka
rm -rf /kafka-logs/*
tar xzf /backup/kafka-logs-YYYYMMDD.tar.gz -C /
chown -R kafka:kafka /kafka-logs
systemctl start kafka
# 3. 验证数据恢复
kafka-topics.sh --bootstrap-server localhost:9092 --list
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic --time -1
1️⃣1️⃣ 最佳实践
1. 扩容前的准备工作
建立性能基线(平时收集):
# 1. 记录当前吞吐量
kafka-producer-perf-test.sh --topic baseline-test --num-records 100000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 acks=all > /baseline/producer_perf.txt
# 2. 记录当前分区分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe > /baseline/partitions_distribution.txt
# 3. 记录当前磁盘使用
for broker in kafka-broker-{1..3}; do
ssh $broker "df -h /kafka-logs" >> /baseline/disk_usage.txt
done
# 4. 记录当前 JMX 指标
# 使用 Prometheus 保存历史数据
2. 使用机架感知(Rack Awareness)
配置机架信息:
# server.properties(每个 Broker 配置不同机架)
# Broker 1, 2 在机架 A
broker.rack=rack-a
# Broker 3, 4 在机架 B
broker.rack=rack-b
# Broker 5 在机架 C
broker.rack=rack-c
机架感知分配(自动):
# Kafka 会自动将副本分配到不同机架
# 示例:Topic 有 3 个副本,分别在 rack-a, rack-b, rack-c
kafka-topics.sh --create --bootstrap-server localhost:9092 \
--topic rack-aware-topic \
--partitions 12 \
--replication-factor 3
# 验证副本分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic rack-aware-topic
# 预期输出:每个分区的副本分布在不同机架
3. 使用 Cruise Control 自动化再平衡
Cruise Control 优势:
- • 自动生成最优分配方案(考虑磁盘/网络/CPU 负载)
- • 提供 Web UI 和 REST API
- • 支持自动执行(无需手动操作)
部署 Cruise Control:
# 1. 下载 Cruise Control
git clone https://github.com/linkedin/cruise-control.git
cd cruise-control
# 2. 构建
./gradlew jar
# 3. 配置(config/cruisecontrol.properties)
bootstrap.servers=localhost:9092
zookeeper.connect=localhost:2181/kafka
webserver.http.port=9090
# 4. 启动
./kafka-cruise-control-start.sh config/cruisecontrol.properties
# 5. 生成再平衡计划(REST API)
curl -X POST "http://localhost:9090/kafkacruisecontrol/add_broker?brokerid=4&brokerid=5&dryrun=true"
# 6. 执行再平衡
curl -X POST "http://localhost:9090/kafkacruisecontrol/add_broker?brokerid=4&brokerid=5&dryrun=false"
4. 分阶段扩容(降低风险)
策略:先扩容 1 个 Broker,验证无问题后再扩容其余
# 阶段 1:新增 1 个 Broker(Broker-4)
# 执行部分分区迁移(如 20%)
kafka-reassign-partitions.sh --execute --throttle 50MB
# 验证 24 小时,观察:
# - Under-Replicated 分区是否为 0
# - Consumer Lag 是否正常
# - 业务是否有投诉
# 阶段 2:新增第 2 个 Broker(Broker-5)
# 执行剩余分区迁移
# 阶段 3:全部验证通过后,正式上线
5. 监控与告警覆盖
关键告警规则:
# 告警 1: Under-Replicated 分区(P0)
- alert: KafkaUnderReplicated
expr: kafka_server_replicamanager_underreplicatedpartitions > 0
for: 5m
severity: critical
# 告警 2: Offline 分区(P0)
- alert: KafkaOfflinePartitions
expr: kafka_controller_kafkacontroller_offlinepartitionscount > 0
for: 1m
severity: critical
# 告警 3: ISR 缩减(P1)
- alert: KafkaISRShrink
expr: rate(kafka_server_replicamanager_isrshrinkspersec[5m]) > 0
for: 10m
severity: warning
# 告警 4: 磁盘使用率(P1)
- alert: KafkaDiskUsageHigh
expr: (node_filesystem_avail_bytes{mountpoint="/kafka-logs"}/node_filesystem_size_bytes)*100 < 20
for: 5m
severity: warning
6. 定期演练扩容流程
每季度演练一次:
# 1. 准备演练环境(测试集群)
# 2. 执行完整扩容流程(新增 Broker → 迁移 → 验证)
# 3. 模拟故障场景(Broker 宕机、网络故障、磁盘满)
# 4. 执行回滚流程
# 5. 输出演练报告(耗时、问题、改进建议)
# 演练评估指标:
# - 扩容总耗时:目标 < 4 小时(1TB 数据)
# - 业务影响:Consumer Lag 增加 < 10000
# - 回滚时间:目标 < 1 小时
7. 优化生产者/消费者配置
生产者优化(减少迁移影响):
acks=all # 所有副本确认(零数据丢失)
retries=Integer.MAX_VALUE # 无限重试
max.in.flight.requests.per.connection=5 # 允许 5 个未确认请求(提升吞吐)
enable.idempotence=true # 幂等性(避免重复)
compression.type=lz4 # 压缩(节省带宽)
消费者优化(追赶 Lag):
fetch.min.bytes=1048576 # 最小拉取 1MB(减少请求次数)
fetch.max.wait.ms=500 # 最大等待 500ms
max.partition.fetch.bytes=1048576 # 每个分区最大拉取 1MB
1️⃣2️⃣ FAQ(常见问题)
Q1: 扩容过程中可以重启 Broker 吗?
A: 可以,但不推荐。原因:
- • 重启会中断正在进行的数据迁移
- • 可能导致迁移时间延长 2–3 倍
- • 如果是 Controller 节点,会触发 Controller 重新选举
建议做法:
# 如果必须重启,先检查是否是 Controller
kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/*.log --print | grep "controller.id"
# 或查看日志
grep "Registered broker" /kafka-logs/server.log
# 如果是 Controller,先转移 Controller
kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
# 再重启 Broker
systemctl restart kafka
Q2: 如何估算扩容所需时间?
A: 使用以下公式:
迁移时间(小时) = 需迁移数据量(GB) / (迁移带宽(MB/s) * 3600 / 1024) * 安全系数(1.5)
示例:
- 需迁移数据量:500GB
- 迁移带宽:50MB/s(限流配置)
- 迁移时间 = 500 / (50 * 3.6) * 1.5 ≈ 4.2 小时
影响因素:
- • 网络带宽(10Gbps vs 1Gbps)
- • 磁盘 I/O(NVMe SSD vs HDD)
- • 限流配置(50MB/s vs 100MB/s)
- • 压缩比(启用压缩可减少 30–50% 传输量)
Q3: 扩容后为什么 Consumer Lag 增加了?
A: 原因与解决方法:
| 原因 |
诊断方法 |
解决方案 |
| 迁移带宽占满 |
iftop -i eth0 |
降低限流,业务高峰期暂停迁移 |
| Leader 选举频繁 |
grep "Leader election" /kafka-logs/server.log |
等待迁移完成,Leader 选举会稳定 |
| Consumer 性能不足 |
top -H -p <consumer_pid> |
增加 Consumer 实例,优化消费代码 |
| 分区再平衡 |
Consumer 日志 |
正常现象,等待再平衡完成(< 30s) |
Q4: 如何在不停机的情况下更换磁盘?
A: 使用 Kafka 的日志目录隔离功能:
# 1. 添加新磁盘(如 /kafka-logs-new)
mkdir -p /kafka-logs-new
chown kafka:kafka /kafka-logs-new
# 2. 修改 server.properties(添加新目录)
log.dirs=/kafka-logs,/kafka-logs-new
# 3. 重启 Broker(新分区会写入新磁盘)
systemctl restart kafka
# 4. 迁移旧分区到新磁盘(使用 kafka-reassign-partitions.sh)
# 生成 JSON 文件,指定分区移动到新目录
{
"version": 1,
"partitions": [
{"topic": "my-topic", "partition": 0, "replicas": [1], "log_dirs": ["/kafka-logs-new"]}
]
}
# 5. 执行迁移
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file /tmp/move-to-new-disk.json \
--execute
# 6. 验证完成后,移除旧目录
# 编辑 server.properties
log.dirs=/kafka-logs-new
# 重启 Broker
systemctl restart kafka
Q5: 扩容后如何验证数据完整性?
A: 执行以下验证:
# 1. 验证消息数量(对比扩容前后)
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic my-topic \
--time -1 | awk -F: '{sum += $3} END {print sum}'
# 预期输出:与扩容前一致
# 2. 验证 Offset 范围
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic my-topic \
--time -2
# 预期输出:每个分区的 earliest offset
# 3. 验证副本一致性(使用 kafka-replica-verification)
kafka-replica-verification.sh \
--broker-list localhost:9092 \
--topic-white-list 'my-topic'
# 预期输出:max lag is 0(所有副本一致)
# 4. 抽样验证(消费少量消息对比内容)
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic \
--from-beginning \
--max-messages 100 > /tmp/sample_after.txt
# 对比扩容前保存的样本
diff /tmp/sample_before.txt /tmp/sample_after.txt
# 预期输出:无差异
Q6: 如何处理扩容后的热点分区?
A: 热点分区(某个分区流量远高于其他)需要重新分区:
# 1. 识别热点分区(监控 BytesInPerSec)
# 查看各分区流量(使用 JMX 或 Kafka Manager)
MBean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=my-topic
# 2. 增加分区数(注意:只能增加不能减少)
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic my-topic \
--partitions 24 # 原 12 个分区 → 24 个分区
# 3. 重新分配分区(使用上述流程)
kafka-reassign-partitions.sh --generate ...
# 4. 优化 Producer 分区策略(避免热点)
# 使用自定义 Partitioner
public class CustomPartitioner implements Partitioner {
public int partition(String topic, Object key, ...) {
// 使用 Hash(key) + 随机数,避免集中到某个分区
return (key.hashCode() + ThreadLocalRandom.current().nextInt(10)) % numPartitions;
}
}
Q7: KRaft 模式扩容与 ZooKeeper 模式有何不同?
A: 主要差异:
| 项目 |
ZooKeeper 模式 |
KRaft 模式 |
| 元数据存储 |
ZooKeeper |
Kafka 内部(__cluster_metadata) |
| 扩容命令 |
相同(kafka-reassign-partitions.sh) |
相同 |
| Controller 选举 |
通过 ZooKeeper |
通过 Raft 协议 |
| 依赖 |
需要 ZooKeeper 集群 |
无需 ZooKeeper |
| 性能 |
元数据操作较慢 |
元数据操作快 10 倍 |
KRaft 模式扩容示例:
# 1. 新增 Broker 配置(server.properties)
node.id=4 # 替代 broker.id
process.roles=broker # 角色:broker(非 controller)
controller.quorum.voters=1@host1:9093,2@host2:9093,3@host3:9093 # Controller 节点列表
# 2. 启动 Broker
kafka-server-start.sh /opt/kafka/config/server.properties
# 3. 其余步骤与 ZooKeeper 模式相同
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 ...
1️⃣3️⃣ 附录:关键脚本
一键扩容脚本
#!/bin/bash
# 文件名:kafka_scale_out.sh
# 用途:自动化 Kafka 集群扩容与再平衡
set -e
# ============ 配置变量 ============
BOOTSTRAP_SERVERS="localhost:9092"
NEW_BROKERS="4,5" # 新增 Broker ID 列表
ALL_BROKERS="1,2,3,4,5" # 所有 Broker ID(含新增)
TOPICS_FILE="/tmp/topics-to-move.json" # Topic 列表
THROTTLE_BYTES=52428800 # 限流 50MB/s
WORK_DIR="/tmp/kafka_scale_out_$(date +%Y%m%d_%H%M%S)"
# ============ 前置检查 ============
echo "[$(date)] [1/8] 前置检查..."
mkdir -p "$WORK_DIR"
# 检查 Kafka 工具是否可用
if ! command -v kafka-topics.sh &&> /dev/null; then
echo "错误:kafka-topics.sh 未找到,请检查 PATH"
exit 1
fi
# 检查新 Broker 是否在线
echo "检查新 Broker 是否在线..."
for broker_id in $(echo $NEW_BROKERS | tr ',' ' '); do
if ! kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP_SERVERS | grep -q "^$broker_id "; then
echo "错误:Broker $broker_id 不在线"
exit 1
else
echo "✓ Broker $broker_id 在线"
fi
done
# ============ 检查集群健康 ============
echo "[$(date)] [2/8] 检查集群健康..."
UNDER_REPLICATED=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --under-replicated-partitions | wc -l)
if [ $UNDER_REPLICATED -gt 0 ]; then
echo "警告:当前有 $UNDER_REPLICATED 个 Under-Replicated 分区"
read -p "是否继续?(yes/no): " CONFIRM
if [ "$CONFIRM" != "yes" ]; then
echo "用户取消操作"
exit 0
fi
fi
# ============ 生成 Topic 列表 ============
echo "[$(date)] [3/8] 生成 Topic 列表..."
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --list | \
jq -R '{topic: .}' | jq -s '{topics: ., version: 1}' > $TOPICS_FILE
echo "Topic 列表已保存到: $TOPICS_FILE"
# ============ 生成再平衡计划 ============
echo "[$(date)] [4/8] 生成再平衡计划..."
kafka-reassign-partitions.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--topics-to-move-json-file $TOPICS_FILE \
--broker-list "$ALL_BROKERS" \
--generate > "$WORK_DIR/reassignment-plan.txt"
# 提取 proposed 计划
grep -A 9999 "Proposed partition reassignment configuration" "$WORK_DIR/reassignment-plan.txt" | \
grep -v "Proposed" > "$WORK_DIR/proposed.json"
# 提取 current 计划(用于回滚)
grep -B 9999 "Proposed partition reassignment configuration" "$WORK_DIR/reassignment-plan.txt" | \
head -n -1 | grep -A 9999 "Current partition replica assignment" | \
grep -v "Current" > "$WORK_DIR/current.json"
echo "再平衡计划已保存到:"
echo " 当前分配: $WORK_DIR/current.json"
echo " 建议分配: $WORK_DIR/proposed.json"
# ============ 审核计划 ============
echo "[$(date)] [5/8] 审核再平衡计划..."
echo "将迁移以下分区:"
cat "$WORK_DIR/proposed.json" | jq -r '.partitions[] | "\(.topic)-\(.partition): \(.replicas | join(","))"' | head -n 10
TOTAL_PARTITIONS=$(cat "$WORK_DIR/proposed.json" | jq '.partitions | length')
echo "总计: $TOTAL_PARTITIONS 个分区"
echo ""
read -p "确认执行迁移?(yes/no): " CONFIRM
if [ "$CONFIRM" != "yes" ]; then
echo "用户取消操作"
exit 0
fi
# ============ 执行再平衡 ============
echo "[$(date)] [6/8] 执行再平衡(限流: $THROTTLE_BYTES 字节/秒)..."
kafka-reassign-partitions.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--reassignment-json-file "$WORK_DIR/proposed.json" \
--execute \
--throttle $THROTTLE_BYTES
echo "再平衡已启动,监控进度中..."
# ============ 监控进度 ============
echo "[$(date)] [7/8] 监控迁移进度..."
while true; do
RESULT=$(kafka-reassign-partitions.sh \
--bootstrap-server $BOOTSTRAP_SERVERS \
--reassignment-json-file "$WORK_DIR/proposed.json" \
--verify 2>&1)
IN_PROGRESS=$(echo "$RESULT" | grep -c "still in progress" || true)
COMPLETED=$(echo "$RESULT" | grep -c "is complete" || true)
echo "[$(date)] 进度: $COMPLETED/$TOTAL_PARTITIONS 完成, $IN_PROGRESS 进行中"
if [ $IN_PROGRESS -eq 0 ]; then
echo "迁移完成!"
break
fi
sleep 30
done
# ============ 验证与清理 ============
echo "[$(date)] [8/8] 验证与清理..."
# 移除限流
echo "移除限流配置..."
for broker_id in $(echo $ALL_BROKERS | tr ',' ' '); do
kafka-configs.sh --bootstrap-server $BOOTSTRAP_SERVERS \
--entity-type brokers --entity-name $broker_id \
--alter --delete-config follower.replication.throttled.rate,leader.replication.throttled.rate 2>&1 | grep -v "not exist" || true
done
# 验证 Under-Replicated 分区
UNDER_REPLICATED=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --under-replicated-partitions | wc -l)
if [ $UNDER_REPLICATED -eq 0 ]; then
echo "✓ Under-Replicated 分区数: 0(健康)"
else
echo "⚠ Under-Replicated 分区数: $UNDER_REPLICATED"
fi
echo ""
echo "============ 扩容完成 ============"
echo "工作目录: $WORK_DIR"
echo " 当前分配: $WORK_DIR/current.json (用于回滚)"
echo " 新分配: $WORK_DIR/proposed.json"
echo ""
echo "回滚命令(如需要):"
echo "kafka-reassign-partitions.sh --bootstrap-server $BOOTSTRAP_SERVERS --reassignment-json-file $WORK_DIR/current.json --execute"
使用方法:
# 下载脚本
curl -O https://your-repo.com/kafka_scale_out.sh
chmod +x kafka_scale_out.sh
# 编辑配置(修改 NEW_BROKERS 和 ALL_BROKERS)
vi kafka_scale_out.sh
# 执行扩容
sudo ./kafka_scale_out.sh
负载均衡验证脚本
#!/bin/bash
# 文件名:kafka_balance_check.sh
# 用途:验证 Kafka 集群负载是否均衡
BOOTSTRAP_SERVERS="localhost:9092"
echo "============ Kafka 负载均衡检查 ============"
# 1. 检查 Leader 分布
echo ""
echo "=== Leader 分布 ==="
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe | \
grep "Leader:" | awk '{print $6}' | sort | uniq -c | sort -rn
echo "(期望:各 Broker Leader 数量相近)"
# 2. 检查副本分布
echo ""
echo "=== 副本分布 ==="
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe | \
grep "Replicas:" | awk '{print $8}' | tr ',' '\n' | sort | uniq -c | sort -rn
echo "(期望:各 Broker 副本数量相近)"
# 3. 检查磁盘使用
echo ""
echo "=== 磁盘使用率 ==="
for broker in kafka-broker-{1..5}; do
USAGE=$(ssh $broker "df -h /kafka-logs 2>/dev/null | tail -n 1 | awk '{print \$5}'" 2>/dev/null || echo "N/A")
echo "$broker: $USAGE"
done
echo "(期望:各 Broker 磁盘使用率相近,差异 < 10%)"
# 4. 检查 Under-Replicated 分区
echo ""
echo "=== Under-Replicated 分区 ==="
UNDER_REP=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --under-replicated-partitions | wc -l)
if [ $UNDER_REP -eq 0 ]; then
echo "✓ 无 Under-Replicated 分区(健康)"
else
echo "⚠ 检测到 $UNDER_REP 个 Under-Replicated 分区"
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVERS --describe --under-replicated-partitions
fi
echo ""
echo "============ 检查完成 ============"
