首页 > 解决方案 > Spark 在 foreachRDD 操作中抛出不可序列化异常

问题描述

我正在尝试使用 scala 和 spark 流实现观察者模式。这个想法是,每当我从流(来自kafka)收到一条记录时,我通过在闭包内调用方法“notifyObservers”来通知观察者。这是代码:

流由 kafka utils 提供。方法 notifyObserver 被定义为遵循模式规则的抽象类。我认为该错误与方法无法序列化有关。我的想法正确吗?如果是,我应该遵循什么样的解决方案?谢谢

def onMessageConsumed() = {
    stream.foreachRDD(rdd => {
      rdd.foreach(consumerRecord => {
        val record = new Record[T](consumerRecord.topic(), 
                                   consumerRecord.value())
        //notify observers with the record to compute
        notifyObservers(record)
      })
    })
  }

标签: scalaapache-kafkaspark-streaming

解决方案


是的,在发送给其他执行器(在foreach等中执行)的代码中使用的类应该实现Serializable接口。

另外,如果您的通知代码需要连接到某些资源,则需要换行foreachforeachPartition如下所示:

stream.foreachRDD(rdd => {
   rdd.foreachPartition(rddPartition =>
      // setup connection to external component      
      rddPartition.foreach(consumerRecord => {
        val record = new Record[T](consumerRecord.topic(), 
                                   consumerRecord.value())
        notifyObservers(record)
      })
      // close connection to external component
   })
  })

推荐阅读