apache-spark - 为什么将RDD更改为DataFrame时会出现Spark不可序列化的异常?
问题描述
我正在使用结构化流媒体并且以下代码有效
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.rdd.mapPartitions(...)
}}
但是下面的代码会抛出异常,object not serializable (class: redis.clients.jedis.Jedis, value: redis.clients.jedis.Jedis@a8e0378)
代码只有一处改动(将RDD改为DataFrame):
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.mapPartitions(...) // only change is change batchDF.rdd to batchDF
}}
我的Jedis
代码应该在驱动程序上执行,并且永远不会到达执行程序。我想 Spark RDD 和 DataFrame 应该有类似的 APIS?为什么会发生这种情况?
我使用 ctrl 进入较低级别的代码。batchDF.mapPartitions
去_
@Experimental
@InterfaceStability.Evolving
def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] =
{
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}
然后batchDF.rdd.mapPartitions
去
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}
我的 Spark 版本是 2.4.3。
下面是我最简单的代码版本,我刚刚发现了其他东西......
val j = new Jedis() // an redis client which is not serializable.
xx.writeStream.foreachBatch{(batchDF: DataFrame, batchId: Long) => {
j.xtrim(...)... // call function of Jedis here
batchDF.mapPartitions(x => {
val arr = x.grouped(2).toArray // this line matters
})
// only change is change batchDF.rdd to batchDF
}}
解决方案
在内部调用您的函数的 rdd.mapPartitions。
/**
* Returns a new RDD by applying a function to each partition of this DataFrame.
* @group rdd
* @since 1.3.0
*/
def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R] = {
rdd.mapPartitions(f)
}
在其他地方你可能做错了没有区别。
AFAIK,理想情况下应该是这样
batchDF.mapPartitions { yourparition =>
// better to create a JedisPool and take object rather than new Jedis
val j = new Jedis()
val result = yourparition.map {
// do some process here
}
j.close // release and take care of connections/ resources here
result
}
}
推荐阅读
- cordova - 无法读取未定义的属性“getPicture”
- android - 当我们在 recyclerview 中使用 searchview 时,我们如何才能将搜索到的字符着色?
- r - 将宽数据框收集成窄数据框,但展开以创建临时行
- push-notification - 无法在离子应用程序中查看通知图标
- sql-server - 该序列不包含任何元素
- vb6 - 鼠标单击打印对话框不起作用
- java - 使用docker和java spring时keycloak认证问题
- android - 无法从 createBestAvailableBackgroundScheduler 的方法中找到 androidx 的类 - Android
- css - 何时在 css 中使用 width % 和 vw
- raspberry-pi - 树莓派 4 上的 UART