首页 > 解决方案 > 如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件

问题描述

我在 pyspark 上有一个结构化的流式传输作业,它对文件源进行了一些聚合。我有一个 kinesis firehose 将来自 IoT 类型应用程序的数据组合起来,并将数据存储在 S3 位置上,作为每分钟一个文件存储在以下文件夹结构中的不同文件夹中 -

s3://year/month/day/hour/

我的 spark 结构化流式传输作业似乎没有列出我的 S3 存储桶中可用的所有文件。由于上市过程似乎比我设置的 processingTime 花费更多的时间。我收到以下警告,我想知道是否有办法不让这种情况发生。

18/06/15 14:28:35 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 74364 milliseconds
18/06/15 14:28:42 WARN FileStreamSource: Listed 4449 file(s) in 6822.134244 ms
18/06/15 14:29:06 WARN FileStreamSource: Listed 4449 file(s) in 6478.381219 ms
18/06/15 14:30:08 WARN FileStreamSource: Listed 4450 file(s) in 8285.654031 ms

标签: apache-sparkamazon-s3

解决方案


S3 API List 操作只能用于检索共享前缀的存储桶中的所有对象键。所以根本不可能只列出新的、未处理的对象。Databricks 人员似乎有一个解决方案,您可以在其中设置 S3 以在创建新对象时创建 SQS 记录。Spark 然后检查 SQS 是否有新对象并从 S3 检索特定对象(即不涉及列表)。不幸的是,此连接器似乎仅在 Databricks 集群上可用并且尚未开源,因此如果您使用例如 EMR,则不能使用它(当然,除非您自己实现连接器)。


推荐阅读