java - 限制每次执行结构化流查询时扫描的 S3 分区
问题描述
我们目前将一堆 Avro 文件上传到 S3,这些文件按上传时间进行分区,我们希望将它们 ETL 到 Parquet 中,并按事件的 customerId 重新分区。
我们已经使用 spark 结构化流的 POC 解决方案,它似乎在语义上是正确的。我们的代码有一堆看起来像的查询
Dataset<Row> sourceStream = sparkSession.readStream()
.format("com.databricks.spark.avro")
.load("/avro/input/path");
sourceStream
.drop("ingestion_hour")
.withColumn("c_id", new Column("customer_id"))
.repartition(new Column("c_id"))
.writeStream()
.trigger(Trigger.Once())
.outputMode(OutputMode.Append())
.format("parquet")
.option("checkpointLocation", "/checkpoint/path/")
.partitionBy("c_id")
.start("/parquet/output/path");
但是,似乎存在一个问题,即/avro/input/path
前缀下的文件数量不断增加,从而导致每次执行 ETL 所需的时间越来越长,以便在 S3 中列出文件以计算新文件。
我试图将单个流式查询拆分为每小时消耗的多个流式查询,例如/avro/input/path/ingestion_hour=yyyy-MM-dd-HH
并输出到/parquet/output/path
具有检查点路径的同一接收器/checkpoint/path/yyyy-MM-dd-HH
。然后我可以将作业限制为仅执行过去的查询,例如 12 小时。但是,这项工作似乎并不像在多个查询之间共享一个接收器是正确的做法,因为大多数记录没有显示在输出中。
我想知道是否有某种方法可以使这种方法起作用,或者是否有不同的方法来限制列出的文件数量?
解决方案
推荐阅读
- perl - 在linux上自动替换主机密钥
- api - linkedin "此应用程序不允许创建应用程序令牌"
- javascript - CSS 将相似项目设置为与屏幕上最高项目相同的高度
- python-3.x - 切换音乐开/关 Kivy
- c++ - 检查 nullptr 导致 CTD
- java - UnsatisfiedDependencyException java.lang.IllegalArgumentException:查询方法公共抽象 java.util.Optional 的验证失败
- azure - 如何为 Azure Log Analytics 启用自定义日志?
- python - 如何将从日期中提取的小时转换为小时+(以小时为单位的日期)?
- powerbi - Power BI 传递参数以报告
- shell - 根据 shell 脚本中的列值创建文件