首页 > 解决方案 > 如何为类型 Iterator[org.apache.spark.sql.Row] 创建编码器

问题描述

我在 databricks 笔记本中使用 spark 2.4.4。我在数据框中有一个数据,我想用它来更新 Postgre 表中的记录。我正在遵循这篇文章Spark Dataframes UPSERT to Postgres Table中给出的方法

这是我的代码

import spark.implicits._ 

val update_query = s"""UPDATE scored_fact.f_learner_assessment_item_response_classifications_test SET is_deleted = ? where f.learner_assigned_item_classification_attempt_sk = ?::uuid AND f.root_org_partition= ?::int"""


changedSectionLearnerDF.coalesce(8).mapPartitions((d) => Iterator(d)).foreach { batch =>
  val dbc: Connection = DriverManager.getConnection(connectionUrl)
  val stmt: PreparedStatement = dbc.prepareStatement(update_query)

  batch.grouped(100).foreach { session =>
    session.foreach { row =>
      stmt.setBoolean( 0, row.getAs[Boolean]("is_deleted") )
     stmt.setString( 1, row.getAs[String]("learner_assigned_item_classification_attempt_sk"))
     stmt.setString( 2, row.getAs[String]("root_org_partition"))
     stmt.addBatch()
    }
    stmt.executeBatch()
  }
  dbc.close()
}

我得到以下错误

Unable to find encoder for type Iterator[org.apache.spark.sql.Row]. An implicit Encoder[Iterator[org.apache.spark.sql.Row]] is needed to store Iterator[org.apache.spark.sql.Row] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.changedSectionLearnerDF.coalesce(8).mapPartitions((d) => Iterator(d)).foreach { batch =>

我确定我错过了一些东西。如何通过创建编码器来解决此错误

标签: scalaapache-sparkapache-spark-sql

解决方案


的签名mapPartitions

def mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U] 

所以dind => Iterator(d)是一个Iterator[Row]并且函数返回一个Dataset[Iterator[Row]]不能合理存在的。

我认为这个mapPartitions电话完全是错误的,.mapPartitions((d) => Iterator(d)).foreach应该被替换为foreachPartition(正如@shridharama 对链接答案的评论所说)。


推荐阅读