scala - 如何为类型 Iterator[org.apache.spark.sql.Row] 创建编码器
问题描述
我在 databricks 笔记本中使用 spark 2.4.4。我在数据框中有一个数据,我想用它来更新 Postgre 表中的记录。我正在遵循这篇文章Spark Dataframes UPSERT to Postgres Table中给出的方法
这是我的代码
import spark.implicits._
val update_query = s"""UPDATE scored_fact.f_learner_assessment_item_response_classifications_test SET is_deleted = ? where f.learner_assigned_item_classification_attempt_sk = ?::uuid AND f.root_org_partition= ?::int"""
changedSectionLearnerDF.coalesce(8).mapPartitions((d) => Iterator(d)).foreach { batch =>
val dbc: Connection = DriverManager.getConnection(connectionUrl)
val stmt: PreparedStatement = dbc.prepareStatement(update_query)
batch.grouped(100).foreach { session =>
session.foreach { row =>
stmt.setBoolean( 0, row.getAs[Boolean]("is_deleted") )
stmt.setString( 1, row.getAs[String]("learner_assigned_item_classification_attempt_sk"))
stmt.setString( 2, row.getAs[String]("root_org_partition"))
stmt.addBatch()
}
stmt.executeBatch()
}
dbc.close()
}
我得到以下错误
Unable to find encoder for type Iterator[org.apache.spark.sql.Row]. An implicit Encoder[Iterator[org.apache.spark.sql.Row]] is needed to store Iterator[org.apache.spark.sql.Row] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.changedSectionLearnerDF.coalesce(8).mapPartitions((d) => Iterator(d)).foreach { batch =>
我确定我错过了一些东西。如何通过创建编码器来解决此错误
解决方案
的签名mapPartitions
是
def mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U])(implicit arg0: Encoder[U]): Dataset[U]
所以d
ind => Iterator(d)
是一个Iterator[Row]
并且函数返回一个Dataset[Iterator[Row]]
不能合理存在的。
我认为这个mapPartitions
电话完全是错误的,.mapPartitions((d) => Iterator(d)).foreach
应该被替换为foreachPartition
(正如@shridharama 对链接答案的评论所说)。
推荐阅读
- rust - 来自 Rc 的地图
> 到 Ref<'_, U> - excel - Excel公式查看多列以确定
- javascript - 如何从 Trimble Maps JavaScript Maps SDK 中的路由实例获取报告?
- java - Runnable 在 Android 上挂起 UI 线程
- amazon-web-services - 无法在 react-native 中列出 S3 存储桶
- c++ - C ++重载单个运算符将2个结构作为指针传递
- arrays - 将数组对象保存到 MongoDB 数据库时出现 Mongoose “VersionError”
- android - 如何在 Kotlin Firebase 中计算树的孩子?
- python - 如何停止窗口抖动
- vim - 我粘贴时 Vim 删除换行符