scala - 一轮只处理几个文件
问题描述
我有一个可行的解决方案,但我正在寻找一些更安全、更好的方法。
每次作业启动时,它都会查找一个自定义检查点,该检查点指示应该从哪个日期开始处理。从源数据帧中,我创建一个从指定开始日期开始的数据帧 - 基于检查点。该解决方案现在限制了必须处理的数据帧的行:
val readFormat = "delta"
val sparkRead = spark.read.format(readFormat)
val fileFormat = if (readFormat == "delta") "" else "." + readFormat
val testData = sparkRead
.load(basePath + "/testData/table_name" + fileFormat)
.where(!((col("size") < 1)))
.where($"modified" >= start)
.limit(5000)
对于每个标识符,我从 Azure 存储下载文件,并将内容保存在数据框的新列中:
val tryDownload = testData
.withColumn(
"fileStringPreview",
downloadUDF($"id"))
.withColumn(
"status",
when(
(($"fileStringPreview"
.startsWith("failed:") === true) ||
($"fileStringPreview"
.startsWith("emptyUrl") === true)),
lit("failed")).otherwise(
lit("succeeded")))
完成此操作后,检查点将按此迭代中处理的元素的最新修改日期更新。
def saveLatest(saved_df: DataFrame, timeSeriesColName: String): Unit = {
val latestTime = saved_df.agg(max(timeSeriesColName)).collect()(0)
try {
val timespanEnd = latestTime.getTimestamp(0).toInstant().toEpochMilli()
saveTimestamp(timespanEnd) // this function actually stores the data
} catch {
case e: java.lang.NullPointerException => {
LoggingWrapper.log("timespanEnd is null");
}
}
}
saveLatest(tryDownload, "modified")
我担心这个限制(5000)解决方案,有没有更好的方法,可以在每次迭代中保持下载指定数量的文件的良好性能?
提前感谢您的建议!:)
解决方案
推荐阅读
- excel - 在 VBA 中跨定义的列集合并行
- python - 循环和字典
- mysql - 是否有一个 MySql SELECT 会给出 A 列与 B 列的结果?
- dosbox - 需要RL51不挂机运行
- python - 获取 Numpy 数组所有权的函数
- python - 无法修改 MDSelectionList 属性
- html - 如何在没有滚动条的情况下制作具有首字下沉响应的两列布局?
- sql - java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback$class
- sql - 基于多列平均列
- mysql - 我在表中有生日,我需要找到 30 到 70 之间的年龄,我该如何编写 sql 查询?