首页 > 解决方案 > Spark 作业在不执行 udf 的情况下完成

问题描述

我一直在处理包含 udf 的冗长复杂的 spark 作业时遇到问题。

我遇到的问题是 udf 似乎没有被正确调用,尽管没有错误消息。

我知道它没有被正确调用,因为输出被写入,只有 udf 应该计算的任何东西都是NULL,并且在本地调试时没有打印语句出现。

唯一的线索是该代码以前使用不同的输入数据工作,这意味着错误必须与输入有关。

输入的变化主要意味着使用了不同的列名,这在代码中已解决。

在给定第一个“工作”输入的情况下执行打印语句。

两个输入都是使用来自同一个数据库的同一系列步骤创建的,通过检查,这两个输入似乎都没有问题。

我以前从未经历过这种行为,任何可能导致这种行为的线索将不胜感激。

代码是单一且不灵活的 - 我正在重构,但它不是一个容易分解的部分。这是正在发生的事情的简短版本:

package mypackage

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.util._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.sql.types._

import scala.collection.{Map => SMap}


object MyObject {

  def main(args: Array[String]){
    val spark: SparkSession = SparkSession.builder()
      .appName("my app")
      .config("spark.master", "local")
      .getOrCreate()

    import spark.implicits._

    val bigInput = spark.read.parquet("inputname.parquet")
    val reference_table = spark.read.parquet("reference_table.parquet")
    val exchange_rate = spark.read.parquet("reference_table.parquet")


    val bigInput2 = bigInput
      .filter($"column1" === "condition1")
      .join(joinargs)
      .drop(dropargs)

    val bigInput3 = bigInput
      .filter($"column2" === "condition2")
      .join(joinargs)
      .drop(dropargs)

    <continue for many lines...>

    def mapper1(
      arg1: String,
      arg2: Double,
      arg3: Integer
    ): List[Double]{
      exchange_rate.map(
        List(idx1, idx2, idx3),
        r.toSeq.toList
          .drop(idx4)
          .take(arg2)
      )
    }

    def mapper2(){}
    ...
    def mapper5(){}

    def my_udf(
      arg0: Integer,
      arg1: String,
      arg2: Double,
      arg3: Integer,
      ...
      arg20: String
    ): Double = {
      println("I'm actually doing something!")
      val result1 = mapper1(arg1, arg2, arg3)
      val result2 = mapper2(arg4, arg5, arg6, arg7)
      ...
      val result5 = mapper5(arg18, arg19, arg20)
      result1.take(arg0)
        .zipAll(result1, 0.0, 0.0)
        .map(x=>_1*x._2)
        ....
        .zipAll(result5, 0.0, 0.0)
        .foldLeft(0.0)(_+_)
    }

    spark.udf.register("myUDF", my_udf_)
    val bigResult1 = bigInputFinal.withColumn("Newcolumnname",
      callUDF(
        "myUDF",
        $"col1",
        ...
        $"col20"
      )
    )

    <postprocessing>
    bigResultFinal
        .filter(<configs>)
        .select(<column names>)
        .write
        .format("parquet")
  }
}

回顾一下

此代码在两个输入文件中的每一个上运行完成。

udf 似乎只在第一个文件上执行。

尽管所有非 udf 逻辑似乎都成功完成,但没有错误消息或使用第二个文件的任何内容。

非常感谢任何帮助!

标签: scalaapache-spark

解决方案


这里没有调用 UDF,因为 spark 是惰性的,除非您对数据帧使用任何操作,否则它不会调用 UDF。您可以通过强制数据框操作来实现这一点。


推荐阅读