scala - 动态向数据框添加列时如何避免序列化错误?
问题描述
我正在尝试遍历包含要使用窗口函数排名的列的列名列表,并添加一个新列,其中包含源列的结果顶部值。为此,我将输入数据帧声明为 for 循环外的变量,并在循环内对其进行更新。根据我的理解,Task not serializable
这可能是由于在主节点上声明了变量 df ,然后 for 循环试图在工作节点上访问它。我可以使用相同的逻辑避免这个错误,还是有不同的方法可以添加这些列?
def getHighestScoredAttributes(scoredDF: DataFrame, attributes: Array[String]) : DataFrame = {
var df = scoredDF
for (attribute <- attributes) {
val maxValidWindow = Window.partitionBy(df("druid")).orderBy(
when(df("validity") === lit("valid"), lit(1)).otherwise(lit(0)).desc,
when(df(attribute).isNotNull, lit(1)).otherwise(lit(0)).desc,
df("rank").desc_nulls_last)
val maxInvalidWindow = Window.partitionBy(df("druid")).orderBy(
when(df("validity") === lit("invalid"), lit(1)).otherwise(lit(0)).desc,
when(df(attribute).isNotNull, lit(1)).otherwise(lit(0)).desc,
df("rank").desc_nulls_last)
df = df.withColumn("valid_" + attribute, first(attribute) over maxValidWindow)
.withColumn("valid_" + attribute + "_dt", first("attest_dt") over maxValidWindow)
.withColumn("invalid_" + attribute, first(attribute) over maxInvalidWindow)
.withColumn("invalid_" + attribute + "_dt", first("attest_dt") over maxInvalidWindow)
}
df
}
解决方案
原来问题不在于上面的代码,而在于 orderby 中的排名列。它使用窗口函数在此函数范围之外声明,但在函数体内延迟评估,导致此错误。用@transient 标记那个Window val 可以解决这个问题。
推荐阅读
- dart - 颤振补间基本动画在“FutureBuilder”中不起作用
- javascript - 第二次访问屏幕时 React Native null is not an object (evalating '_this2.state.data') 错误
- swift - 为什么 UIStackView 调整排列的子视图的大小?
- logback - Flink 日志记录限制:如何将日志记录配置传递给 flink 作业
- architecture - 对于特定时间间隔的重复任务队列或 cron 哪个更好?
- android - 在为 x86 构建的 Android SDK 上启动应用程序时出错
- javascript - For 循环在实际发生操作之前将日志打印到控制台
- c++ - 包含指针的函数中的错误?
- sql-server - 与 SQL Server(Linux 容器)的元数据库连接出现超时错误
- search - 如何获取所有 Pdo 搜索