首页 > 解决方案 > 任务不可序列化 Spark

问题描述

我正在尝试遍历 2 个不同的数据帧,并在此过程中检查其中一个数据帧中的值是否位于指定的一组值中,但我得到 org.apache.spark.SparkException: Task not serializable。

如何改进我的代码以修复此错误?

这是它现在的样子:

private val unix_epoch = udf[Long, String, String]{ (date, time) =>
    DateTimeFormat.forPattern("yyyyMMdd HHmmSS").parseDateTime(s"$date $time").getSeconds
  }

for {
      helpers <- helperFeed.dfFromDate(helperStartDateTime)
      dstHours <- feed.dfFromDate(startDateTime).map(_.select($"hour").distinct().as[Int].collect().toSeq)
    } {
     sparkSession.delta(srcPath).map { srcFeed =>

val srcHours = srcConversions
          .withColumn("hour", unix_epoch($"event_date", $"event_time"))
          .where($"hour" isin (dstHours: _*))

我想在使用过滤器添加 where 子句后会出现此错误。但是我还能如何实现它呢?我也尝试将 srcHours 和 dstHours 作为数据框加入,但我得到了同样的错误。

更新: dfFromDate 只是通过将它们与 startdate 进行比较来限制数据框中的值(应该是> startdate)

标签: scalaapache-sparkapache-spark-sql

解决方案


基本上,问题是无法序列化的 DateTimeFormat 。有多种观点可以解决这个问题。我决定将 DateTimeFormat 定义移动到类的伴随对象中,这对我有用。


推荐阅读