首页 > 解决方案 > getPersistentRDDs 在 Spark 2.2.0 中返回缓存 RDD 和 DataFrame 的 Map,但在 Spark 2.4.7 中 - 它仅返回缓存 RDD 的 Map

问题描述

如果在 Spark 2.2.0 版本中缓存 RDD 和 DataFramegetPersistentRDDs返回 Map size 2:

scala> val rdd = sc.parallelize(Seq(1))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val df = sc.parallelize(Seq(2)).toDF
df: org.apache.spark.sql.DataFrame = [value: int]

scala> spark.sparkContext.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.cache
res1: df.type = [value: int]

scala> spark.sparkContext.getPersistentRDDs
res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(4 -> *SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
 MapPartitionsRDD[4] at cache at <console>:27)

scala> rdd.cache
res3: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> spark.sparkContext.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(0 -> ParallelCollectionRDD[0] at parallelize at <console>:24, 4 -> *SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]
 MapPartitionsRDD[4] at cache at <console>:27)

但在 Spark 版本 2.4.7 中getPersistentRDDs返回 Map size 1

...
scala> spark.sparkContext.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(0 -> ParallelCollectionRDD[0] at parallelize at <console>:24)

以及如何获取所有缓存对象而不仅仅是 RDD 的问题,以及开始表现不同的方法突然发生了什么?

标签: scalaapache-sparkrdd

解决方案


数据帧实际上并未缓存在内存中,因为尚未对数据帧执行任何操作,因此将其从getPersistentRDDs. 我认为更高版本中的行为实际上是可取的。但是一旦你对dataframe做了什么,它就会被缓存起来,并且会出现在结果中getPersistentRDDs,如下图:

scala> val df = sc.parallelize(Seq(2)).toDF
df: org.apache.spark.sql.DataFrame = [value: int]

scala> sc.getPersistentRDDs
res0: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.cache
res1: df.type = [value: int]

scala> sc.getPersistentRDDs
res2: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map()

scala> df.count()
res3: Long = 1

scala> sc.getPersistentRDDs
res4: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] =
Map(3 -> *(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]
 MapPartitionsRDD[3] at count at <console>:26)

推荐阅读