首页 > 解决方案 > 在读取镶木地板文件时刷新 Dataframe 的元数据

问题描述

我正在尝试将镶木地板文件读取为将定期更新的数据帧(路径为/folder_name。每当有新数据出现时,旧的镶木地板文件路径(/folder_name)将被重命名为临时路径,然后我们将新数据和旧数据联合起来将存储在旧路径(/folder_name

发生的情况是假设我们hdfs://folder_name/part-xxxx-xxx.snappy.parquet在更新之前有一个镶木地板文件,然后在更新之后将其更改为hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet

正在发生的问题是当我尝试在更新完成时读取镶木地板文件时

sparksession.read.parquet("filename") => 它采用旧路径hdfs://folder_name/part-xxxx-xxx.snappy.parquet(路径存在)

当在数据帧上调用一个操作时,它试图从中读取数据,hdfs://folder_name/part-xxxx-xxx.snappy.parquet但由于更新文件名发生了变化,我遇到了以下问题

java.io.FileNotFoundException:文件不存在:hdfs://folder_name/part-xxxx-xxx.snappy.parquet 底层文件可能已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或通过重新创建所涉及的数据集/数据帧来显式地使 Spark 中的缓存无效。

我正在使用 Spark 2.2

谁能帮助我如何刷新元数据?

标签: apache-sparkapache-spark-sqlparquetapache-spark-dataset

解决方案


当您尝试读取不存在的文件时会发生该错误。

如果我错了,请纠正我,但我怀疑您在保存新数据框时覆盖了所有文件(使用.mode("overwrite"))。在此过程运行时,您正在尝试读取已删除的文件并引发异常 - 这会使表在一段时间内不可用(在更新期间)。

据我所知,没有你想要的“刷新元数据”的直接方法。

解决此问题的两种(几种可能的)方法:

1 - 使用附加模式

如果您只想将新数据框附加到旧数据框,则无需创建临时文件夹并覆盖旧数据框。您可以将保存模式从覆盖更改为追加。通过这种方式,您可以将分区添加到现有 Parquet 文件中,而无需重写现有分区。

df.write
  .mode("append")
  .parquet("/temp_table")

这是迄今为止最简单的解决方案,无需读取已存储的数据。但是,如果您必须更新旧数据(例如:如果您正在执行 upsert),这将不起作用。为此,您有选项 2:

2 - 使用 Hive 视图

您可以创建配置单元表并使用视图指向最新(和可用)的表。

以下是有关此方法背后逻辑的示例:

第1部分

  • 如果视图<table_name>不存在,我们创建一个新表 <table_name>_alpha0来存储新数据
  • <table_name>创建表后,我们创建一个视图select * from <table_name>_alpha0

第2部分

  • 如果视图<table_name>存在,我们需要查看它指向哪个表(<table_name>_alphaN)

  • 您对新数据执行所需的所有操作,将其保存为名为<table_name>_alpha(N+1)

  • 创建表后,我们将视图更改<table_name>select * from <table_name>_alpha(N+1)

还有一个代码示例:

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._


//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')

def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
  if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {

    val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
      .filter("col_name == 'View Text'")
      .rdd

    if(rdd_desc.isEmpty()) {
      None
    }
    else {
      Option(
        rdd_desc.first()
          .get(1)
          .toString
          .toLowerCase
          .stripPrefix("select * from ")
      )
    }
  }
  else
    None
}

//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.

def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
  val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
  val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")

  new_df.write
    .mode("overwrite")
    .format("parquet")
    .option("compression","snappy")
    .saveAsTable(nextAlphaTable)

  spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}

//An example on how to use this:

//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"

println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")

saveDataframe(spark, dbName, tableName, df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

val processed_df = df.unionByName(new_data) //Or other operations you want to do

println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)

println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show

结果:

Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  1|        I|
|  2|       am|
+---+---------+

Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id|     text|
+---+---------+
|  3|        a|
|  4|dataframe|
|  5|     with|
|  6|      new|
|  7|     data|
|  1|        I|
|  2|       am|
+---+---------+

通过这样做,您可以保证视图的一个版本<table_name>将始终可用。这也具有维护表的先前版本的优点(或没有,取决于您的情况)。以前的版本<table_name_alpha1>将是<table_name_alpha0>

3 - 奖金

如果可以选择升级 Spark 版本,请查看Delta Lake(最低 Spark 版本:2.4.2)

希望这可以帮助 :)


推荐阅读