首页 > 解决方案 > 限制每次执行结构化流查询时扫描的 S3 分区

问题描述

我们目前将一堆 Avro 文件上传到 S3,这些文件按上传时间进行分区,我们希望将它们 ETL 到 Parquet 中,并按事件的 customerId 重新分区。

我们已经使用 spark 结构化流的 POC 解决方案,它似乎在语义上是正确的。我们的代码有一堆看起来像的查询

Dataset<Row> sourceStream = sparkSession.readStream()
  .format("com.databricks.spark.avro")
  .load("/avro/input/path");

sourceStream
  .drop("ingestion_hour")
  .withColumn("c_id", new Column("customer_id"))
  .repartition(new Column("c_id"))
  .writeStream()
  .trigger(Trigger.Once())
  .outputMode(OutputMode.Append())
  .format("parquet")
  .option("checkpointLocation", "/checkpoint/path/")
  .partitionBy("c_id")
  .start("/parquet/output/path");

但是,似乎存在一个问题,即/avro/input/path前缀下的文件数量不断增加,从而导致每次执行 ETL 所需的时间越来越长,以便在 S3 中列出文件以计算新文件。

我试图将单个流式查询拆分为每小时消耗的多个流式查询,例如/avro/input/path/ingestion_hour=yyyy-MM-dd-HH并输出到/parquet/output/path具有检查点路径的同一接收器/checkpoint/path/yyyy-MM-dd-HH。然后我可以将作业限制为仅执行过去的查询,例如 12 小时。但是,这项工作似乎并不像在多个查询之间共享一个接收器是正确的做法,因为大多数记录没有显示在输出中。

我想知道是否有某种方法可以使这种方法起作用,或者是否有不同的方法来限制列出的文件数量?

标签: javaapache-sparkspark-structured-streaming

解决方案


推荐阅读