apache-spark - 在读取镶木地板文件时刷新 Dataframe 的元数据
问题描述
我正在尝试将镶木地板文件读取为将定期更新的数据帧(路径为/folder_name
。每当有新数据出现时,旧的镶木地板文件路径(/folder_name
)将被重命名为临时路径,然后我们将新数据和旧数据联合起来将存储在旧路径(/folder_name
)
发生的情况是假设我们hdfs://folder_name/part-xxxx-xxx.snappy.parquet
在更新之前有一个镶木地板文件,然后在更新之后将其更改为hdfs://folder_name/part-00000-yyyy-yyy.snappy.parquet
正在发生的问题是当我尝试在更新完成时读取镶木地板文件时
sparksession.read.parquet("filename") => 它采用旧路径hdfs://folder_name/part-xxxx-xxx.snappy.parquet
(路径存在)
当在数据帧上调用一个操作时,它试图从中读取数据,hdfs://folder_name/part-xxxx-xxx.snappy.parquet
但由于更新文件名发生了变化,我遇到了以下问题
java.io.FileNotFoundException:文件不存在:hdfs://folder_name/part-xxxx-xxx.snappy.parquet
底层文件可能已更新。您可以通过在 SQL 中运行“REFRESH TABLE tableName”命令或通过重新创建所涉及的数据集/数据帧来显式地使 Spark 中的缓存无效。
我正在使用 Spark 2.2
谁能帮助我如何刷新元数据?
解决方案
当您尝试读取不存在的文件时会发生该错误。
如果我错了,请纠正我,但我怀疑您在保存新数据框时覆盖了所有文件(使用.mode("overwrite")
)。在此过程运行时,您正在尝试读取已删除的文件并引发异常 - 这会使表在一段时间内不可用(在更新期间)。
据我所知,没有你想要的“刷新元数据”的直接方法。
解决此问题的两种(几种可能的)方法:
1 - 使用附加模式
如果您只想将新数据框附加到旧数据框,则无需创建临时文件夹并覆盖旧数据框。您可以将保存模式从覆盖更改为追加。通过这种方式,您可以将分区添加到现有 Parquet 文件中,而无需重写现有分区。
df.write
.mode("append")
.parquet("/temp_table")
这是迄今为止最简单的解决方案,无需读取已存储的数据。但是,如果您必须更新旧数据(例如:如果您正在执行 upsert),这将不起作用。为此,您有选项 2:
2 - 使用 Hive 视图
您可以创建配置单元表并使用视图指向最新(和可用)的表。
以下是有关此方法背后逻辑的示例:
第1部分
- 如果视图
<table_name>
不存在,我们创建一个新表<table_name>_alpha0
来存储新数据 <table_name>
创建表后,我们创建一个视图select * from <table_name>_alpha0
第2部分
如果视图
<table_name>
存在,我们需要查看它指向哪个表(<table_name>_alphaN)
您对新数据执行所需的所有操作,将其保存为名为
<table_name>_alpha(N+1)
创建表后,我们将视图更改
<table_name>
为select * from <table_name>_alpha(N+1)
还有一个代码示例:
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types._
import spark.implicits._
//This method verifies if the view exists and returns the table it is pointing to (using the query 'describe formatted')
def getCurrentTable(spark: SparkSession, databaseName:String, tableName: String): Option[String] = {
if(spark.catalog.tableExists(s"${databaseName}.${tableName}")) {
val rdd_desc = spark.sql(s"describe formatted ${databaseName}.${tableName}")
.filter("col_name == 'View Text'")
.rdd
if(rdd_desc.isEmpty()) {
None
}
else {
Option(
rdd_desc.first()
.get(1)
.toString
.toLowerCase
.stripPrefix("select * from ")
)
}
}
else
None
}
//This method saves a dataframe in the next "alpha table" and updates the view. It maintains 'rounds' tables (default=3). I.e. if the current table is alpha2, the next one will be alpha0 again.
def saveDataframe(spark: SparkSession, databaseName:String, tableName: String, new_df: DataFrame, rounds: Int = 3): Unit ={
val currentTable = getCurrentTable(spark, databaseName, tableName).getOrElse(s"${databaseName}.${tableName}_alpha${rounds-1}")
val nextAlphaTable = currentTable.replace(s"_alpha${currentTable.last}",s"_alpha${(currentTable.last.toInt + 1) % rounds}")
new_df.write
.mode("overwrite")
.format("parquet")
.option("compression","snappy")
.saveAsTable(nextAlphaTable)
spark.sql(s"create or replace view ${databaseName}.${tableName} as select * from ${nextAlphaTable}")
}
//An example on how to use this:
//SparkSession: spark
val df = Seq((1,"I"),(2,"am"),(3,"a"),(4,"dataframe")).toDF("id","text")
val new_data = Seq((5,"with"),(6,"new"),(7,"data")).toDF("id","text")
val dbName = "test_db"
val tableName = "alpha_test_table"
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
println("Saving dataframe")
saveDataframe(spark, dbName, tableName, df)
println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show
val processed_df = df.unionByName(new_data) //Or other operations you want to do
println("Saving new dataframe")
saveDataframe(spark, dbName, tableName, processed_df)
println("Dataframe saved")
println(s"Current table: ${getCurrentTable(spark, dbName, tableName).getOrElse("Table does not exist")}")
spark.read.table(s"${dbName}.${tableName}").show
结果:
Current table: Table does not exist
Saving dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha0
+---+---------+
| id| text|
+---+---------+
| 3| a|
| 4|dataframe|
| 1| I|
| 2| am|
+---+---------+
Saving new dataframe
Dataframe saved
Current table: test_db.alpha_test_table_alpha1
+---+---------+
| id| text|
+---+---------+
| 3| a|
| 4|dataframe|
| 5| with|
| 6| new|
| 7| data|
| 1| I|
| 2| am|
+---+---------+
通过这样做,您可以保证视图的一个版本<table_name>
将始终可用。这也具有维护表的先前版本的优点(或没有,取决于您的情况)。即以前的版本<table_name_alpha1>
将是<table_name_alpha0>
3 - 奖金
如果可以选择升级 Spark 版本,请查看Delta Lake(最低 Spark 版本:2.4.2)
希望这可以帮助 :)
推荐阅读
- visual-studio - Visual Studio -> 包装每个参数
- javascript - React - 流和条件渲染
- java - webapp1的html5视频使用webapp2的源码,如何避免webapp2暴露给外网浏览器
- xcode - 我能以某种方式获取 NSErrorDomain: CloudPhotoLibraryErrorDomain 的可用错误代码常量列表吗?
- file-upload - 使用 actix-web-Framework 在 Rust 中的网络服务器上上传文件时使用原始文件名保存文件
- angular - 为什么使用 redux-state-sync 时即使状态改变了 redux @select 也不起作用?
- javascript - Highcharts删除注解抛出TypeError
- android - 使用房间分页时,LiveData 仅触发一次
- php - 如何使用 POST 构造将一页变量数据发布到两个不同的页面?
- apache-spark - 使用 PySpark 将 CSV 文件转换为 Parquet 时出现问题:内存不足