首页 > 解决方案 > 为什么将RDD更改为DataFrame时会出现Spark不可序列化的异常?

问题描述

我正在使用结构化流媒体并且以下代码有效

val j = new Jedis() // an redis client which is not serializable.

xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
  j.xtrim(...)... // call function of Jedis here
  batchDF.rdd.mapPartitions(...)
}}

但是下面的代码会抛出异常,object not serializable (class: redis.clients.jedis.Jedis, value: redis.clients.jedis.Jedis@a8e0378)

代码只有一处改动(将RDD改为DataFrame):

val j = new Jedis() // an redis client which is not serializable.

xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
  j.xtrim(...)... // call function of Jedis here
  batchDF.mapPartitions(...)  // only change is change batchDF.rdd to batchDF
}}

我的Jedis代码应该在驱动程序上执行,并且永远不会到达执行程序。我想 Spark RDD 和 DataFrame 应该有类似的 APIS?为什么会发生这种情况?


我使用 ctrl 进入较低级别的代码。batchDF.mapPartitions去_

  @Experimental
  @InterfaceStability.Evolving
  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = 
  {
    new Dataset[U](
      sparkSession,
      MapPartitions[T, U](func, logicalPlan),
      implicitly[Encoder[U]])
  }

然后batchDF.rdd.mapPartitions

    def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
      preservesPartitioning)
  }

我的 Spark 版本是 2.4.3。

下面是我最简单的代码版本,我刚刚发现了其他东西......

val j = new Jedis() // an redis client which is not serializable.

xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
  j.xtrim(...)... // call function of Jedis here
  batchDF.mapPartitions(x => {
    val arr = x.grouped(2).toArray // this line matters
  })
  // only change is change batchDF.rdd to batchDF
}}

标签: apache-sparkapache-spark-sqlspark-structured-streaming

解决方案


看到这个 DataFrame api 实现

在内部调用您的函数的 rdd.mapPartitions。

     /**
       * Returns a new RDD by applying a function to each partition of this DataFrame.
       * @group rdd
       * @since 1.3.0
       */
      def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
        rdd.mapPartitions(f)
      }

在其他地方你可能做错了没有区别。

AFAIK,理想情况下应该是这样

 batchDF.mapPartitions { yourparition =>
// better to create a JedisPool and take object rather than new Jedis
 val j = new Jedis() 
val result = yourparition.map {
// do some process here
}

j.close // release and take care of connections/ resources here
result
}
}

推荐阅读