一、案例概述
在Kafka集群的运维过程中,有时会遇到一个特定问题:某个Topic的数据明明已经超过了配置的最大保留时间,但日志段(Segment)却迟迟没有被清理,而集群内其他Topic的清理机制均工作正常。
问题根因猜测:消息的 CreateTime 时间戳异常
问题的根源可能在于生产者端。Kafka生产者发送消息时,可以传入一个时间戳参数,该时间戳会作为消息的CreateTime被记录在日志中。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, (Iterable)null);
}
Kafka日志清理的核心逻辑是:对于一个日志段(segment),只有当当前时间 - 该段的最大时间戳 > 配置的保留时间(retention.ms)时,这个段才会被判定为“已过期”,从而被删除。
如果生产者出现以下情况:
- 在构造
ProducerRecord时,显式地设置了一个未来的时间戳。
- 或者,生产者所在的主机或容器系统时间严重超前(例如,被错误地设置为2027年)。
那么将导致:
- 只要某个segment中包含一条CreateTime为未来时间(如2027-01-01)的消息,该segment的
segmentMaxTimestamp就会被标记为这个未来的时间点。
- 以当前时间(如2025年12月)来看,该segment永远不满足
now - segmentMaxTimestamp > retention.ms这个删除条件。因此,这个segment永远不会被清理。
- 如果这个有问题的生产者持续向该Topic写入数据,每个新的segment都可能混入几条未来时间戳的消息,最终导致整个Topic的所有segment都无法满足删除条件,数据保留策略实质上失效。
默认情况下,Topic配置message.timestamp.type=CreateTime,这意味Broker会直接使用生产者传递的时间戳来进行保留期的判断,从而放大了此问题的影响。
二、验证过程
要验证上述猜想,核心是检查最早未被删除的日志文件中,是否存在消息的CreateTime大于文件最后修改时间(或某个合理时间点)的情况。
可以通过以下命令进行分析(请将limit变量替换为你认为合理的毫秒级时间戳上限):
/opt/kafka/bin/kafka-dump-log.sh \
--files 00000000000004086150.log \
--print-data-log --deep-iteration 2>/dev/null \
| awk -v limit=1725667200000 '
/CreateTime:/ {
ts = ""
# 提取这一行里的 CreateTime 值
for (i = 1; i <= NF; i++) {
if ($i == "CreateTime:") {
ts = $(i + 1)
break
}
}
if (ts == "") next
# 如果发现大于 limit 的,打印并退出
if (ts > limit) {
print "FUTURE_TIMESTAMP_FOUND:", ts, "line:", $0
exit 0
}
# 否则就把这一条记录的 CreateTime 打印出来,继续往后查
print "CreateTime:", ts
}'
命令解析与结果判断:
- 正常情况下,输出会是一行行
CreateTime: 1725616540727这样的记录。
- 一旦脚本发现第一条
CreateTime值大于指定limit的记录,便会输出类似下方的信息,这证实了我们的猜想。
FUTURE_TIMESTAMP_FOUND: 2002600999673 line: baseOffset: 4086150 lastOffset: 4086150 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 47 isTransactional: false isControl: false position: 0 CreateTime: 2002600999673 size: 652 magic: 2 compresscodec: NONE crc: 640118469 isvalid: true
三、解决办法
确认问题后,可以从以下两种方案中选择其一进行解决:
1. 修改Topic时间戳类型配置
将该Topic的message.timestamp.type从CreateTime改为LogAppendTime。这意味着之后所有新写入消息的时间戳将使用Broker接收到消息时的时间,从而绕过生产者错误时间戳的影响。
kafka-configs.sh --bootstrap-server <broker_host:port> \
--alter --entity-type topics --entity-name your-topic-name \
--add-config message.timestamp.type=LogAppendTime
注意:此配置修改仅对新写入的数据生效。历史遗留的、包含未来时间戳的segment,仍然需要等到真实时间推进到其segmentMaxTimestamp之后,才会被自动清理。
2. 手动删除过期数据
如果希望立即释放磁盘空间,可以使用Kafka提供的工具进行手动数据删除。删除请求提交后,后台清理线程通常会在几分钟内执行。
# 1. 确认待清理分区当前的偏移量范围(示例为partition 0)
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <broker_host:port> --topic your-topic-name --time -1
# 2. 构造删除记录的JSON配置文件。
# 例如,确定要将partition 0的起始偏移量(最早可消费位置)设置为4086151。
cat > delete-your-topic-name.json <<EOF
{
"partitions": [
{
"topic": "your-topic-name",
"partition": 0,
"offset": 4086151
}
],
"version": 1
}
EOF
# 3. 执行删除命令(此操作将丢弃partition 0中偏移量小于4086151的所有消息)
kafka-delete-records.sh \
--bootstrap-server <broker_host:port> \
--offset-json-file delete-your-topic-name.json