首页 > 解决方案 > 具有许多并行存储桶的 Flink Hadoop Bucketing Sink 性能

问题描述

我正在研究将数据从 Kafka 传输到 S3 Sink 的 Flink 作业的性能。我们正在使用 BucketingSink 来编写 parquet 文件。分桶逻辑根据数据类型、租户(客户)、日期时间、提取 ID 等划分具有文件夹的消息。这导致每个文件存储在由 9-10 层组成的文件夹结构中(s3_bucket:/ 1/2/3/4/5/6/7/8/9/myFile...)

如果数据作为租户类型的消息突发分布,我们会看到写入性能良好,但是当数据更多是在数千个租户、数十种数据类型和多个提取 ID 上的白噪声分布时,我们会有难以置信的损失表演。(大约 300 倍)

附加调试器,似乎问题与在 S3 上同时打开以写入数据的处理程序的数量有关。进一步来说: 分析执行

研究用于写入 S3 的 hadoop 库,我发现了一些可能的改进设置:

      <name>fs.s3a.connection.maximum</name>
      <name>fs.s3a.threads.max</name>
      <name>fs.s3a.threads.core</name>
      <name>fs.s3a.max.total.tasks</name>

但是这些都没有对吞吐量产生很大影响。我还尝试展平文件夹结构以写入单个键,例如 (1_2_3_...) 但这并没有带来任何改进。

注意:测试是在 Flink 1.8 上使用 Hadoop FileSystem (BucketingSink) 完成的,使用 hadoop fs 库 2.6.x 写入 S3(因为我们使用 Cloudera CDH 5.x 作为保存点),所以我们不能切换到 StreamingFileSink .

标签: hadoopamazon-s3apache-flink

解决方案


https://lists.apache.org/thread.html/50ef4d26a1af408df8d9abb70589699cb6b26b2600ab6f4464e86ea4%40%3Cdev.flink.apache.org%3E中 Kostas 的建议之后

速度变慢的罪魁祸首是这段代码: https ://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink /streaming/connectors/fs/bucketing/BucketingSink.java#L543-L551

仅此一项大约需要 4-5 秒,打开文件总共需要 6 秒。来自检测调用的日志:

2020-02-07 08:51:05,825 INFO  BucketingSink  - openNewPartFile FS verification
2020-02-07 08:51:09,906 INFO  BucketingSink  - openNewPartFile FS verification - done
2020-02-07 08:51:11,181 INFO  BucketingSink  - openNewPartFile FS - completed partPath = s3a://....

这与具有 60 秒不活动翻转的存储桶的默认设置一起 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache /flink/streaming/connectors/fs/bucketing/BucketingSink.java#L195 意味着当我们完成创建最后一个存储桶时,一个插槽上有超过 10 个并行存储桶,第一个存储桶变得陈旧,因此需要轮换生成阻塞情况。

我们通过替换BucketingSink.java和删除上面提到的 FS 检查解决了这个问题:

        LOG.debug("Opening new part file FS verification");
        if (!fs.exists(bucketPath)) {
            try {
                if (fs.mkdirs(bucketPath)) {
                    LOG.debug("Created new bucket directory: {}", bucketPath);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Could not create new bucket path.", e);
            }
        }
        LOG.debug("Opening new part file FS verification - done");

正如我们看到的,没有它,sink 工作正常,现在文件打开大约需要 1.2 秒。

此外,我们将默认的非活动阈值设置为 5 分钟。通过这些更改,我们可以轻松地处理每个插槽 200 多个存储桶(一旦作业加快速度,它将在所有插槽上摄取,因此推迟非活动超时)


推荐阅读