scala - Spark 作业在不执行 udf 的情况下完成
问题描述
我一直在处理包含 udf 的冗长复杂的 spark 作业时遇到问题。
我遇到的问题是 udf 似乎没有被正确调用,尽管没有错误消息。
我知道它没有被正确调用,因为输出被写入,只有 udf 应该计算的任何东西都是NULL,并且在本地调试时没有打印语句出现。
唯一的线索是该代码以前使用不同的输入数据工作,这意味着错误必须与输入有关。
输入的变化主要意味着使用了不同的列名,这在代码中已解决。
在给定第一个“工作”输入的情况下执行打印语句。
两个输入都是使用来自同一个数据库的同一系列步骤创建的,通过检查,这两个输入似乎都没有问题。
我以前从未经历过这种行为,任何可能导致这种行为的线索将不胜感激。
代码是单一且不灵活的 - 我正在重构,但它不是一个容易分解的部分。这是正在发生的事情的简短版本:
package mypackage
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.util._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.types._
import scala.collection.{Map => SMap}
object MyObject {
def main(args: Array[String]){
val spark: SparkSession = SparkSession.builder()
.appName("my app")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val bigInput = spark.read.parquet("inputname.parquet")
val reference_table = spark.read.parquet("reference_table.parquet")
val exchange_rate = spark.read.parquet("reference_table.parquet")
val bigInput2 = bigInput
.filter($"column1" === "condition1")
.join(joinargs)
.drop(dropargs)
val bigInput3 = bigInput
.filter($"column2" === "condition2")
.join(joinargs)
.drop(dropargs)
<continue for many lines...>
def mapper1(
arg1: String,
arg2: Double,
arg3: Integer
): List[Double]{
exchange_rate.map(
List(idx1, idx2, idx3),
r.toSeq.toList
.drop(idx4)
.take(arg2)
)
}
def mapper2(){}
...
def mapper5(){}
def my_udf(
arg0: Integer,
arg1: String,
arg2: Double,
arg3: Integer,
...
arg20: String
): Double = {
println("I'm actually doing something!")
val result1 = mapper1(arg1, arg2, arg3)
val result2 = mapper2(arg4, arg5, arg6, arg7)
...
val result5 = mapper5(arg18, arg19, arg20)
result1.take(arg0)
.zipAll(result1, 0.0, 0.0)
.map(x=>_1*x._2)
....
.zipAll(result5, 0.0, 0.0)
.foldLeft(0.0)(_+_)
}
spark.udf.register("myUDF", my_udf_)
val bigResult1 = bigInputFinal.withColumn("Newcolumnname",
callUDF(
"myUDF",
$"col1",
...
$"col20"
)
)
<postprocessing>
bigResultFinal
.filter(<configs>)
.select(<column names>)
.write
.format("parquet")
}
}
回顾一下
此代码在两个输入文件中的每一个上运行完成。
udf 似乎只在第一个文件上执行。
尽管所有非 udf 逻辑似乎都成功完成,但没有错误消息或使用第二个文件的任何内容。
非常感谢任何帮助!
解决方案
这里没有调用 UDF,因为 spark 是惰性的,除非您对数据帧使用任何操作,否则它不会调用 UDF。您可以通过强制数据框操作来实现这一点。
推荐阅读
- hadoop - YARN RM 在主密钥滚动期间不断尝试为应用程序设置新的 AMRMToken
- typescript - 在 Angular 6 中使用 D3-Lasso
- windows - 从 WSUS 服务器下载 Windows 更新
- r - AWS lambda R 运行时分段错误
- dask - pydata BLAZE 项目的方向在哪里?
- node.js - 让节点服务器在继续之前等待客户端输入
- jupyter-notebook - nbconvert 乳胶图形定位
- php - 仅当表包含来自不同行的多个所需值时,如何从表中获取 ID
- javascript - 在对象内迭代数组并相应地添加属性
- javascript - 重建时未设置配置选项 - 引导多选