scala - Spark 在 foreachRDD 操作中抛出不可序列化异常
问题描述
我正在尝试使用 scala 和 spark 流实现观察者模式。这个想法是,每当我从流(来自kafka)收到一条记录时,我通过在闭包内调用方法“notifyObservers”来通知观察者。这是代码:
流由 kafka utils 提供。方法 notifyObserver 被定义为遵循模式规则的抽象类。我认为该错误与方法无法序列化有关。我的想法正确吗?如果是,我应该遵循什么样的解决方案?谢谢
def onMessageConsumed() = {
stream.foreachRDD(rdd => {
rdd.foreach(consumerRecord => {
val record = new Record[T](consumerRecord.topic(),
consumerRecord.value())
//notify observers with the record to compute
notifyObservers(record)
})
})
}
解决方案
是的,在发送给其他执行器(在foreach
等中执行)的代码中使用的类应该实现Serializable
接口。
另外,如果您的通知代码需要连接到某些资源,则需要换行foreach
,foreachPartition
如下所示:
stream.foreachRDD(rdd => {
rdd.foreachPartition(rddPartition =>
// setup connection to external component
rddPartition.foreach(consumerRecord => {
val record = new Record[T](consumerRecord.topic(),
consumerRecord.value())
notifyObservers(record)
})
// close connection to external component
})
})
推荐阅读
- sql - 处理 BigQuery 中未嵌套记录导致的重复行的最佳做法?
- javascript - 为什么“父”的值得到“子”的值的总和,而不是TreeMap中自身的值?
- javascript - mirage.js 多通配符直通
- angular - 来自 REST API 的 Angular 6 @ngx-translate 翻译
- php - 使用 cron 调用 php 脚本的问题
- kubernetes - 使用 Argo Workflows 创建队列系统
- leaflet - tileLayer和控件的Leaflet js问题
- django - django - 从用户发布的基于类的视图
- parsing - 解析器/语法:嵌套规则中的 2x 括号
- xamarin.forms - 从 xamarin 表单中的 xml 响应的 url 获取多个图像