scala - 任务不可序列化 Spark
问题描述
我正在尝试遍历 2 个不同的数据帧,并在此过程中检查其中一个数据帧中的值是否位于指定的一组值中,但我得到 org.apache.spark.SparkException: Task not serializable。
如何改进我的代码以修复此错误?
这是它现在的样子:
private val unix_epoch = udf[Long, String, String]{ (date, time) =>
DateTimeFormat.forPattern("yyyyMMdd HHmmSS").parseDateTime(s"$date $time").getSeconds
}
for {
helpers <- helperFeed.dfFromDate(helperStartDateTime)
dstHours <- feed.dfFromDate(startDateTime).map(_.select($"hour").distinct().as[Int].collect().toSeq)
} {
sparkSession.delta(srcPath).map { srcFeed =>
val srcHours = srcConversions
.withColumn("hour", unix_epoch($"event_date", $"event_time"))
.where($"hour" isin (dstHours: _*))
我想在使用过滤器添加 where 子句后会出现此错误。但是我还能如何实现它呢?我也尝试将 srcHours 和 dstHours 作为数据框加入,但我得到了同样的错误。
更新: dfFromDate 只是通过将它们与 startdate 进行比较来限制数据框中的值(应该是> startdate)
解决方案
基本上,问题是无法序列化的 DateTimeFormat 。有多种观点可以解决这个问题。我决定将 DateTimeFormat 定义移动到类的伴随对象中,这对我有用。
推荐阅读
- realm - 无法通过 NSDate 属性正确查询领域结果
- extjs - ExtJS 4.2树形面板突出显示单元格选择行
- javascript - 如何从nodejs调用特定的python函数?
- javascript - 显示文档名称
- jenkins - 如何在 Jenkinsfile 中进行间接变量替换
- android - 包含 .so 文件时,Android 应用程序包会生成巨大的 APK
- join - 运行 6 台 ignite 服务器时获取不完整的数据
- react-native - 我们可以用 react Native 开发网站吗?他们有什么办法吗?
- elasticsearch - ELK Stack 服务的默认端口是什么?
- docker - 将变量从 gitlab-ci.yml 传递到 Dockerfile 而不使用 docker build