首页 > 解决方案 > Apache Flink - s3 文件夹监控 - 许多文件丢失

问题描述

今天是个好日子,

我有一个 Flink 作业,它有一个 S3 文件夹作为源,我们不断将数千个小(每个大约 1KB)gzip 文件放入该文件夹,速度约为每分钟 5000 个文件。以下是我在 Scala 中创建该源代码的方式:

    val my_input_format = new TextInputFormat(
        new org.apache.flink.core.fs.Path(my_path))
    my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
    my_input_format.setNestedFileEnumeration(true)

    val my_raw_stream = streamEnv
            .readFile(my_input_format,
                my_path,
                FileProcessingMode.PROCESS_CONTINUOUSLY,
                1000)

问题是,在上述 1000 毫秒的监控间隔下,大约 20% 的文件被遗漏了。在 Apache Flink Dashboard 中,在后续操作中,我只能看到记录的文件总数的约 80%(“已发送记录”列)。

如果我增加监控间隔,丢失文件的数量会减少。在 5,000 毫秒时,大约是 10%,在 30,000 毫秒时,只有大约 2% 未命中。

虽然没有记录警告/错误。

我无法在 HDFS 中模拟这一点,因为我无法在我们的集群中达到如此高的文件写入速度。

有人可以帮忙吗。非常感谢。

标签: amazon-s3apache-flink

解决方案


Amazon S3 为列出目录提供最终一致性(请参阅此问题)。

监控源列出目录中的文件,并通过记住它们的最大修改时间戳来跟踪它处理的文件。由于 S3 列表不能保证立即一致,因此最大修改时间戳可能会提前,而时间戳较小的文件可能会丢失。

我认为通过增加监控间隔不能完全解决问题。相反,我们需要一个额外的参数来为最大时间戳添加一个偏移量。如果您可以通过邮件列表或打开Jira 票证与 Flink 社区联系,那就太好了。

============ 更新 ==============

我已经按照 Fabian 的建议实施了更改。功能方面,它已完成并正在工作。需要花费更多时间来编写适当的单元测试/文档。 我的实现在这里


推荐阅读