首页 > 解决方案 > 一轮只处理几个文件

问题描述

我有一个可行的解决方案,但我正在寻找一些更安全、更好的方法。

每次作业启动时,它都会查找一个自定义检查点,该检查点指示应该从哪个日期开始处理。从源数据帧中,我创建一个从指定开始日期开始的数据帧 - 基于检查点。该解决方案现在限制了必须处理的数据帧的行:

val readFormat = "delta"
val sparkRead = spark.read.format(readFormat)

val fileFormat = if (readFormat == "delta") "" else "." + readFormat
val testData = sparkRead
                  .load(basePath + "/testData/table_name" + fileFormat)
                  .where(!((col("size") < 1)))
                  .where($"modified" >= start)
                  .limit(5000)

对于每个标识符,我从 Azure 存储下载文件,并将内容保存在数据框的新列中:

val tryDownload = testData
                    .withColumn(
                        "fileStringPreview",
                        downloadUDF($"id"))
                     .withColumn(
                            "status",
                            when(
                              (($"fileStringPreview"
                                .startsWith("failed:") === true) ||
                               ($"fileStringPreview"
                                .startsWith("emptyUrl") === true)),
                              lit("failed")).otherwise(
                              lit("succeeded")))

完成此操作后,检查点将按此迭代中处理的元素的最新修改日期更新。

def saveLatest(saved_df: DataFrame, timeSeriesColName: String): Unit = {
val latestTime = saved_df.agg(max(timeSeriesColName)).collect()(0)
try {
  val timespanEnd = latestTime.getTimestamp(0).toInstant().toEpochMilli()
  saveTimestamp(timespanEnd) // this function actually stores the data
} catch {
  case e: java.lang.NullPointerException => {
    LoggingWrapper.log("timespanEnd is null");
  }
}

}

saveLatest(tryDownload, "modified")  

我担心这个限制(5000)解决方案,有没有更好的方法,可以在每次迭代中保持下载指定数量的文件的良好性能?

提前感谢您的建议!:)

标签: scalaperformanceapache-sparklimitfile-processing

解决方案


推荐阅读