hadoop - 具有许多并行存储桶的 Flink Hadoop Bucketing Sink 性能
问题描述
我正在研究将数据从 Kafka 传输到 S3 Sink 的 Flink 作业的性能。我们正在使用 BucketingSink 来编写 parquet 文件。分桶逻辑根据数据类型、租户(客户)、日期时间、提取 ID 等划分具有文件夹的消息。这导致每个文件存储在由 9-10 层组成的文件夹结构中(s3_bucket:/ 1/2/3/4/5/6/7/8/9/myFile...)
如果数据作为租户类型的消息突发分布,我们会看到写入性能良好,但是当数据更多是在数千个租户、数十种数据类型和多个提取 ID 上的白噪声分布时,我们会有难以置信的损失表演。(大约 300 倍)
附加调试器,似乎问题与在 S3 上同时打开以写入数据的处理程序的数量有关。进一步来说:
研究用于写入 S3 的 hadoop 库,我发现了一些可能的改进设置:
<name>fs.s3a.connection.maximum</name>
<name>fs.s3a.threads.max</name>
<name>fs.s3a.threads.core</name>
<name>fs.s3a.max.total.tasks</name>
但是这些都没有对吞吐量产生很大影响。我还尝试展平文件夹结构以写入单个键,例如 (1_2_3_...) 但这并没有带来任何改进。
注意:测试是在 Flink 1.8 上使用 Hadoop FileSystem (BucketingSink) 完成的,使用 hadoop fs 库 2.6.x 写入 S3(因为我们使用 Cloudera CDH 5.x 作为保存点),所以我们不能切换到 StreamingFileSink .
解决方案
仅此一项大约需要 4-5 秒,打开文件总共需要 6 秒。来自检测调用的日志:
2020-02-07 08:51:05,825 INFO BucketingSink - openNewPartFile FS verification
2020-02-07 08:51:09,906 INFO BucketingSink - openNewPartFile FS verification - done
2020-02-07 08:51:11,181 INFO BucketingSink - openNewPartFile FS - completed partPath = s3a://....
这与具有 60 秒不活动翻转的存储桶的默认设置一起 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache /flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195 意味着当我们完成创建最后一个存储桶时,一个插槽上有超过 10 个并行存储桶,第一个存储桶变得陈旧,因此需要轮换生成阻塞情况。
我们通过替换BucketingSink.java
和删除上面提到的 FS 检查解决了这个问题:
LOG.debug("Opening new part file FS verification");
if (!fs.exists(bucketPath)) {
try {
if (fs.mkdirs(bucketPath)) {
LOG.debug("Created new bucket directory: {}", bucketPath);
}
}
catch (IOException e) {
throw new RuntimeException("Could not create new bucket path.", e);
}
}
LOG.debug("Opening new part file FS verification - done");
正如我们看到的,没有它,sink 工作正常,现在文件打开大约需要 1.2 秒。
此外,我们将默认的非活动阈值设置为 5 分钟。通过这些更改,我们可以轻松地处理每个插槽 200 多个存储桶(一旦作业加快速度,它将在所有插槽上摄取,因此推迟非活动超时)
推荐阅读
- button - 分配给多个绘图的脚本不会在电话表应用程序上运行
- animation - 循环播放相同的动画动作而不重置位置
- c++ - 我有一个文本文件,每行包含一个整数。我想打开文本图块并计算文件中的整数个数
- c - C 文件解决任务,然后在命令中打印出随机字符并自行崩溃
- rust - 如何在 rust 中正确处理关闭时的此错误
- c - 免费调用时出现分段错误
- java - 如何在 Windows 上向 HID 打印机发送数据(>64 字节)
- c++ - 如何在 C++ 头文件中获取所有命名空间/类/方法头?
- apache-kafka - Kafka Log compaction也是去重机制吗
- java - 在 JTable 列 Java Swing 中设置右对齐