amazon-s3 - Apache Flink - s3 文件夹监控 - 许多文件丢失
问题描述
今天是个好日子,
我有一个 Flink 作业,它有一个 S3 文件夹作为源,我们不断将数千个小(每个大约 1KB)gzip 文件放入该文件夹,速度约为每分钟 5000 个文件。以下是我在 Scala 中创建该源代码的方式:
val my_input_format = new TextInputFormat(
new org.apache.flink.core.fs.Path(my_path))
my_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
my_input_format.setNestedFileEnumeration(true)
val my_raw_stream = streamEnv
.readFile(my_input_format,
my_path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000)
问题是,在上述 1000 毫秒的监控间隔下,大约 20% 的文件被遗漏了。在 Apache Flink Dashboard 中,在后续操作中,我只能看到记录的文件总数的约 80%(“已发送记录”列)。
如果我增加监控间隔,丢失文件的数量会减少。在 5,000 毫秒时,大约是 10%,在 30,000 毫秒时,只有大约 2% 未命中。
虽然没有记录警告/错误。
我无法在 HDFS 中模拟这一点,因为我无法在我们的集群中达到如此高的文件写入速度。
有人可以帮忙吗。非常感谢。
解决方案
Amazon S3 为列出目录提供最终一致性(请参阅此问题)。
监控源列出目录中的文件,并通过记住它们的最大修改时间戳来跟踪它处理的文件。由于 S3 列表不能保证立即一致,因此最大修改时间戳可能会提前,而时间戳较小的文件可能会丢失。
我认为通过增加监控间隔不能完全解决问题。相反,我们需要一个额外的参数来为最大时间戳添加一个偏移量。如果您可以通过邮件列表或打开Jira 票证与 Flink 社区联系,那就太好了。
============ 更新 ==============
我已经按照 Fabian 的建议实施了更改。功能方面,它已完成并正在工作。需要花费更多时间来编写适当的单元测试/文档。 我的实现在这里
推荐阅读
- vue.js - 文档未在 nuxtjs 中定义关于集成 jqxwidget
- java - 如何在 IntelliJ 中生成带有组件和类的 UML 图?
- docker - 当我们想使用 github 操作执行 ansible 命令时如何管理 ssh 密钥文件
- spring-boot - Spring Boot Elasticsearch 和 @JsonProperty
- if-statement - 在正文中测试正则表达式之前,需要在主题中设置强制性条件
- android - 是否可以在 HFP 配置文件中选择蓝牙音频编解码器?
- swift - 快速的语法。什么是
在“withUnsafeMutableBytes (_..." - pyspark - 是否可以在 2 个以上的流之间共享 foreachBatch 的 batchId?
- python - 为 UNET 编写特定的损失函数
- github - 如何在 Github Actions 中设置 docker 用户