首页 > 解决方案 > 动态向数据框添加列时如何避免序列化错误?

问题描述

我正在尝试遍历包含要使用窗口函数排名的列的列名列表,并添加一个新列,其中包含源列的结果顶部值。为此,我将输入数据帧声明为 for 循环外的变量,并在循环内对其进行更新。根据我的理解,Task not serializable这可能是由于在主节点上声明了变量 df ,然后 for 循环试图在工作节点上访问它。我可以使用相同的逻辑避免这个错误,还是有不同的方法可以添加这些列?

 def getHighestScoredAttributes(scoredDF: DataFrame, attributes: Array[String]) : DataFrame = {
    var df = scoredDF
    for (attribute <- attributes) {
      val maxValidWindow = Window.partitionBy(df("druid")).orderBy(
        when(df("validity") === lit("valid"), lit(1)).otherwise(lit(0)).desc,
        when(df(attribute).isNotNull, lit(1)).otherwise(lit(0)).desc,
        df("rank").desc_nulls_last)
      val maxInvalidWindow = Window.partitionBy(df("druid")).orderBy(
        when(df("validity") === lit("invalid"), lit(1)).otherwise(lit(0)).desc,
        when(df(attribute).isNotNull, lit(1)).otherwise(lit(0)).desc,
        df("rank").desc_nulls_last)
      df = df.withColumn("valid_" + attribute, first(attribute) over maxValidWindow)
        .withColumn("valid_" + attribute + "_dt", first("attest_dt") over maxValidWindow)
        .withColumn("invalid_" + attribute, first(attribute) over maxInvalidWindow)
        .withColumn("invalid_" + attribute + "_dt", first("attest_dt") over maxInvalidWindow)
    }
    df
  }

标签: scaladataframeapache-sparkserialization

解决方案


原来问题不在于上面的代码,而在于 orderby 中的排名列。它使用窗口函数在此函数范围之外声明,但在函数体内延迟评估,导致此错误。用@transient 标记那个Window val 可以解决这个问题。


推荐阅读