apache-spark - 如何使用 Spark 截断数据并从 Hive 表中删除所有分区
问题描述
如何删除所有数据并从Hive
表中删除所有分区,使用Spark 2.3.0
truncate table my_table; // Deletes all data, but keeps partitions in metastore
alter table my_table drop partition(p_col > 0) // does not work from spark
唯一对我有用的是逐个迭代、show partitions my_table
替换和删除每个分区。但必须有更清洁的方法。如果分区列是 type ,它甚至不起作用。有什么建议么?/
,
string
解决方案
让我们使用 Spark 2.4.3 设置问题:
// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")
// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))
// Verify inserts
spark.table("potato").count // 9 records
我们使用外部目录的listPartitions
和dropPartitions
函数。
// Get External Catalog
val catalog = spark.sharedState.externalCatalog
// Get the spec from the list of all partitions
val partitions = catalog.listPartitions("default", "potato").map(_.spec)
// We pass them to the Catalog's dropPartitions function.
// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", partitions,
ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions
这对MANAGED
桌子很有效,但是EXTERNAL
桌子呢?
// We repeat the setup above but after creating an EXTERNAL table
// After dropping we see that the partitions appear to be gone (or are they?).
catalog.listPartitions("default", "potato").length // 0 partitions
// BUT repairing the table simply adds them again, the partitions/data
// were NOT deleted from the underlying filesystem. This is not what we wanted!
spark.sql("MSCK REPAIR TABLE potato")
catalog.listPartitions("default", "potato").length // 9 partitions again!
为此,我们在删除分区之前将表从EXTERNAL
更改为 。MANAGED
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
// Identify the table in question
val identifier = TableIdentifier("potato", Some("default"))
// Get its current metadata
val tableMetadata = catalog.getTableMetadata(identifier)
// Clone the metadata while changing the tableType to MANAGED
val alteredMetadata = tableMetadata.copy(tableType = CatalogTableType.MANAGED)
// Alter the table using the new metadata
catalog.alterTable(alteredMetadata)
// Now drop!
catalog.dropPartitions("default", "potato", partitions,
ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions
spark.sql("MSCK REPAIR TABLE potato") // Won't add anything
catalog.listPartitions("default", "potato").length // Still 0 partitions!
不要忘记将表改回EXTERNAL
using CatalogTableType.EXTERNAL
。
推荐阅读
- html - 尝试访问 ibm 云对象存储桶中的文件并获取 CORS 错误
- database - YugaByte DB 中的压缩工作
- drag-and-drop - 当边框接触到放置区域时放置一个项目
- r - 查找按天分隔的最常见值
- sql-server - 在 T-SQL 中检索标识列值的最佳方法
- python - 有没有更有效的方法将 XML 文件目录转换为单个 Pandas 数据框?
- sql - 我在 Oracle 数据库中有 3 个重复的行。只想删除第二条重复记录。我该怎么办?
- algorithm - 如何理解归并排序的时间复杂度表达式
- node.js - BigQuery:从 Cloud Storage 执行加载作业时,NodeJS 客户端库似乎不尊重 useAvroLogicalTypes
- angular - Angular 6 cors 问题无法使用代理