性能优化 · 5分钟代码详解:Apache Hudi 如何通过动态布隆过滤器在未知规模的 upsert 操作中实现文件跳过
Apache Hudi 是什么?
Apache Hudi 是一个开源的数据湖仓平台,具备类似数据库的能力,比如 ACID 事务和索引功能。在云存储中处理 PB 级数据的 upsert 操作时,它是性能最出色的框架。
什么是布隆过滤器?
布隆过滤器是一种用于集合成员测试的概率型数据结构。当你问"这个元素当前是否在集合中?"时,布隆过滤器会给出两种答案之一:
- 否:该元素肯定不存在
- 可能:该元素很可能存在,但你需要对照主数据进行验证
它通过牺牲偶尔的误报(可能的情况)来换取巨大的内存节省。
想要交互式地理解布隆过滤器,可以参考 Bloom Filters 页面。
Apache Hudi 如何使用布隆过滤器
Apache Hudi 使用布隆过滤器在 upsert 操作期间跳过文件。当一条记录到达时,Hudi 必须判断它是已经存在(更新)还是全新的(插入)。如果从每个文件中读取记录键,会产生过多的 I/O 开销。
Hudi 的解决方案是:在数据文件的元数据页脚中存储记录键的布隆过滤器。在查找记录键时,Hudi 首先查询布隆过滤器。这个检查的入口点是 HoodieKeyLookupHandle,它在写入阶段处理记录:
apache/hudi:HoodieKeyLookupHandle.java#L80-L90
如果布隆过滤器返回"否",则跳过该文件。如果返回"可能",则将记录键添加到候选列表中,以便对照实际数据进行验证。
mightContain 方法测试一个键是否可能在过滤器中:
apache/hudi:HoodieDynamicBoundedBloomFilter.java#L102-L105
→
apache/hudi:InternalDynamicBloomFilter.java#L115-L128
→
apache/hudi:InternalBloomFilter.java#L153-L167
布隆过滤器的问题
标准布隆过滤器需要预先设定容量。
google/guava:BloomFilter.java#L411-L435
必须指定预期元素数量和期望的误报率,才能正确确定过滤器的大小——需要分配的比特数和哈希函数数量。
google/guava:BloomFilter.java#L525-L554
如果插入的元素数量超过容量,误报概率会急剧上升。对于大多数数据工作负载来说,预测每个文件的记录数几乎是不可能的。如果将过滤器大小设置为最大可能值,又会造成内存和存储的浪费。
那么问题来了:如何在不预先知道数据集大小的情况下,实现概率性的成员检查?
解决方案
Hudi 使用一个可按需增长到可配置上限的标准布隆过滤器链式数组。该方案分为两个阶段:
-
增长阶段:当记录数超过当前过滤器的容量时,分配一个新的过滤器并将其链接起来。每个过滤器都保持其原始的误报保证。
-
边界阶段:一旦记录数达到最大容量,停止增长,使用轮询方式写入现有过滤器。误报率会下降,但内存使用保持可预测。
这种设计倾向于性能降级,而不是冒着"无限增长"导致 OOM(内存溢出)的风险。
实现细节
初始化 InternalDynamicBloomFilter
InternalDynamicBloomFilter 使用一个名为 matrix 的数组,其中每个元素都是一个包含位向量的 InternalBloomFilter。它从一个过滤器开始。使用数组而不是 ArrayList 是为了避免扩展期间的内存过度分配。
apache/hudi:InternalDynamicBloomFilter.java#L56-L75
nr(每个过滤器的记录数)是每个过滤器的容量——超过时会创建新过滤器
maxNr(最大记录总数)是所有过滤器组合的硬上限
将新记录路由到正确的过滤器
Hudi 根据当前容量和增长限制将每条记录路由到适当的布隆过滤器。
apache/hudi:InternalDynamicBloomFilter.java#L236-254
上述代码中的三个分支是:
-
reachedMax == true:已达到最大容量。后续插入通过 curMatrixIndex++ % matrix.length 分布到各个过滤器中。
-
当前过滤器已满,但仍有剩余容量:返回 null 以通知调用者分配新过滤器。(matrix.length * nr) < maxNr 检查是否还有增长预算。
-
当前过滤器有容量:返回 matrix[matrix.length - 1],即最近添加的过滤器。
下面是状态图:
状态图
向动态布隆过滤器添加记录键
add() 方法将记录键添加到活动的布隆过滤器中。
apache/hudi:InternalDynamicBloomFilter.java#L77-L94
getActiveStandardBF() 返回 null 表示当前过滤器已满。通过调用 addRow() 向 matrix 添加新过滤器。
apache/hudi:InternalDynamicBloomFilter.java#L210-218
跨所有过滤器检查键是否存在
要检查键是否存在,必须测试 matrix 中的每个过滤器。如果任何一个返回 true,则该键可能存在。
apache/hudi:InternalDynamicBloomFilter.java#L115-128
查询时间随过滤器数量线性增长,但在合理的边界内,相比跳过文件节省的 I/O,这个开销很小。
InternalBloomFilter:基础构建块
matrix 中的每个元素都是一个 InternalBloomFilter 对象,改编自 Apache Hadoop 的实现。
apache/hudi:InternalBloomFilter.java:L108-L120
通过对键进行哈希并在 bits 向量中设置相应的位来添加键:
apache/hudi:InternalBloomFilter.java#L122-L139
通过检查所有 nbHash 位位置来测试成员资格——键是否在集合中。如果任何位为 0,则该键从未被添加过,因为 add() 会将键的所有 nbHash 位位置设置为 1。
apache/hudi:InternalBloomFilter.java#L153-L167
设计选择
软失败优于硬失败。在分布式处理中,无限制的内存增长会导致 OOM 崩溃。Hudi 宁愿接受更高的误报率(导致更多 I/O),也不愿让执行器崩溃。数据峰值会降低性能,但不会影响可用性。
可预测的写入延迟。标准的可调整大小的布隆过滤器可能会触发对现有条目的昂贵重新哈希。Hudi 通过链接新过滤器而不重新哈希来避免这种情况。
平衡降级。一旦满了,写入通过轮询分布,而不是填满一个过滤器。如果一个过滤器达到约 100% 的位饱和度,它总是返回"可能",实际上提供零过滤价值。轮询使所有过滤器保持相似的填充率,因此它们能保持更长时间的部分有用性,误报率逐渐上升。
影响
动态布隆过滤器使 Hudi 能够在不预先知道记录数的情况下,在 upsert 操作期间跳过文件。用户无需针对每个工作负载调整过滤器大小,因为过滤器会在配置的边界内自动适应。这使得 Hudi 的索引层能够处理跨不同数据量的 PB 级 upsert 操作。
主要贡献者
- PR #976: [HUDI-106] 添加 DynamicBloomFilter 支持,作者:@nsivabalan (Sivabalan Narayanan)
- PR #9649: [HUDI-6826] 从 Hadoop 库移植 BloomFilter 类,作者:@yihua (Yi Ethan Guo)
如果我遗漏了 Hudi 动态布隆过滤器的其他主要贡献者,请告诉我!