apache-spark - 如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件
问题描述
我在 pyspark 上有一个结构化的流式传输作业,它对文件源进行了一些聚合。我有一个 kinesis firehose 将来自 IoT 类型应用程序的数据组合起来,并将数据存储在 S3 位置上,作为每分钟一个文件存储在以下文件夹结构中的不同文件夹中 -
s3://year/month/day/hour/
我的 spark 结构化流式传输作业似乎没有列出我的 S3 存储桶中可用的所有文件。由于上市过程似乎比我设置的 processingTime 花费更多的时间。我收到以下警告,我想知道是否有办法不让这种情况发生。
18/06/15 14:28:35 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 74364 milliseconds
18/06/15 14:28:42 WARN FileStreamSource: Listed 4449 file(s) in 6822.134244 ms
18/06/15 14:29:06 WARN FileStreamSource: Listed 4449 file(s) in 6478.381219 ms
18/06/15 14:30:08 WARN FileStreamSource: Listed 4450 file(s) in 8285.654031 ms
解决方案
S3 API List 操作只能用于检索共享前缀的存储桶中的所有对象键。所以根本不可能只列出新的、未处理的对象。Databricks 人员似乎有一个解决方案,您可以在其中设置 S3 以在创建新对象时创建 SQS 记录。Spark 然后检查 SQS 是否有新对象并从 S3 检索特定对象(即不涉及列表)。不幸的是,此连接器似乎仅在 Databricks 集群上可用并且尚未开源,因此如果您使用例如 EMR,则不能使用它(当然,除非您自己实现连接器)。
推荐阅读
- python - 在sklearn中的LinearRegression之前转置输入矩阵
- go - 为什么 struct Client(在 coreos/etcd/clientv3/client.go 中)有 PUT 方法
- c# - 发送 JSON 格式的 post 请求
- python - 将重复参数传递给 sqlite 查询的优雅方式
- php - 在 Slim v3 中如何访问路由中的 $container 属性?
- javascript - ReactJS,使用渲染按钮递增/递减渲染值
- android - 如何通过单击两个按钮在单个活动中在 RecyclerView 和 ListView 之间切换?
- php - 尝试在表中插入会话变量时解析 If 语句错误
- c# - 单元测试用例 - MissingMethodException
- android - Android 文件选择器未从 Android Webview 调用