首页 > 解决方案 > Spark Checkpointing Non-Streaming - 检查点文件可用于后续作业运行或驱动程序

问题描述

这篇文章来自一篇有趣的文章:http ://www.lifeisafile.com/Apache-Spark-Caching-Vs-Checkpointing/

" ... Checkpointing 将 rdd 物理存储到 hdfs 并销毁创建它的 lineage。即使在 Spark 应用程序终止后,检查点文件也不会被删除。检查点文件可用于后续作业运行或驱动程序。检查点 RDD导致双重计算,因为该操作将在执行计算和写入检查点目录的实际工作之前首先调用缓存。..."

我似乎记得在其他地方读过检查点文件仅适用于给定 Spark 应用程序中的作业或共享作业。

寻找澄清以及新应用程序如何使用检查点目录,因为我认为这是不可能的。

标签: apache-sparkcheckpointing

解决方案


我似乎记得在其他地方读过检查点文件仅适用于给定 Spark 应用程序中的作业或共享作业。

即使checkpoint在停止SparkContext. 我们可以通过设置以下属性来打开自动清理:

spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")

寻找澄清以及新应用程序如何使用检查点目录,因为我认为这是不可能的。

要再次重用检查点数据集,我们可以按照以下步骤操作:

  1. 启动上下文 1 和检查点数据集:
// Setting logger on for ReliableRDDCheckpointData
scala> import org.apache.log4j.{Level, Logger}
scala> Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)

// Note application ID
scala> spark.sparkContext.applicationId
res1: String = local-1567969150914

// Set checkpoint Dir
scala> spark.sparkContext.setCheckpointDir("/tmp/spark/checkpoint")

// File system localtion
Users-Air:checkpoint User$ pwd
/tmp/spark/checkpoint
Users-Air:checkpoint User$ ls -lrth
total 0
drwxr-xr-x  2 User  wheel    64B Sep  8 15:00 7aabcb46-e707-49dd-8893-148a162368d5

// Create Dataframe
scala> val df = spark.range(3).withColumn("random", rand())
scala> df.show
+---+------------------+
| id|            random|
+---+------------------+
|  0|0.8517439782779789|
|  1| 0.288880016535247|
|  2|0.7027831376739603|
+---+------------------+

scala> df.schema
res5: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))

//Check point 
scala> df.checkpoint
19/09/08 15:02:22 INFO ReliableRDDCheckpointData: Done checkpointing RDD 7 to file:/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7, new parent is RDD 8
res6: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, random: double]

// New RDD saved in checkpoint directory /tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7
Users-Air:7aabcb46-e707-49dd-8893-148a162368d5 User$ cd rdd-7/
Users-Air:rdd-7 User$ ls -lrth
total 32
-rw-r--r--  1 User  wheel     4B Sep  8 15:02 part-00000
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00002
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00001
-rw-r--r--  1 User  wheel   163B Sep  8 15:02 part-00003

// Stop context 
scala> spark.stop
scala> :quit

  1. 启动新的上下文 2 并读取检查点数据集
// Initilaized New Context 
scala> spark.sparkContext.applicationId
res0: String = local-1567969525656

SparkContext.checkpointFile是一个protected[spark]方法,所以我们需要在org.apache.spark包下创建类

scala> :paste -raw
// Entering paste mode (ctrl-D to finish)

package org.apache.spark
object RecoverCheckpoint {
  import scala.reflect.ClassTag
  import org.apache.spark.rdd.RDD
  def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
    sc.checkpointFile[T](path)
  }
}

现在使用上述类恢复Checkpointed RDDRDD[InternalRow]RecoverCheckpoint

// Path from first context
scala> val checkPointFilePath = "/tmp/spark/checkpoint/7aabcb46-e707-49dd-8893-148a162368d5/rdd-7"
scala> import org.apache.spark.RecoverCheckpoint
scala> import org.apache.spark.sql.catalyst.InternalRow
scala> import org.apache.spark.sql.types._
scala> val RecoveredRDD = RecoverCheckpoint.recover[InternalRow](spark.sparkContext, checkPointFilePath)

// RDD is recovered as RDD[InternalRow]
scala> RecoveredRDD
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = ReliableCheckpointRDD[0] at recover at <console>:34

// Count matches with original
RecoveredRDD.count
res3: Long = 3

将恢复的 RDD 转换为数据集创建RecoverCheckpointRDDToDF


// Need to convert RDD[InternalRow] to DataFrame
scala> :paste -raw
// Entering paste mode (ctrl-D to finish)

// Creating Dataframe from RDD[InternalRow]
package org.apache.spark.sql
object RecoverCheckpointRDDToDF {
  import org.apache.spark.rdd.RDD
  import org.apache.spark.sql.{DataFrame, SparkSession}
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.types.StructType
  def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
    spark.internalCreateDataFrame(catalystRows, schema)
  }
}

最后,使用RecoverCheckpointRDDToDF并取回数据集

// Schema should be know
val df_schema = StructType(List(StructField("id",LongType,false), StructField("random",DoubleType,false)))
df_schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(random,DoubleType,false))

scala> import org.apache.spark.sql.RecoverCheckpointRDDToDF
scala> val df = RecoverCheckpointRDDToDF.createDataFrame(spark, RecoveredRDD, df_schema)

scala> df.show
+---+------------------+
| id|            random|
+---+------------------+
|  0|0.8517439782779789|
|  1| 0.288880016535247|
|  2|0.7027831376739603|
+---+------------------+

// Same as first context

// Stop context
scala> spark.stop
scala> :quit


推荐阅读