首页 > 解决方案 > 如何使用 Spark 截断数据并从 Hive 表中删除所有分区

问题描述

如何删除所有数据并从Hive表中删除所有分区,使用Spark 2.3.0

truncate table my_table; // Deletes all data, but keeps partitions in metastore

alter table my_table drop partition(p_col > 0) // does not work from spark

唯一对我有用的是逐个迭代、show partitions my_table替换和删除每个分区。但必须有更清洁的方法。如果分区列是 type ,它甚至不起作用。有什么建议么?/,string

标签: apache-sparkhiveapache-spark-sqlhiveql

解决方案


让我们使用 Spark 2.4.3 设置问题:

// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")

// Enable dynamic partitioning 
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")

// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))

// Verify inserts
spark.table("potato").count // 9 records

我们使用外部目录的listPartitionsdropPartitions函数。

// Get External Catalog
val catalog = spark.sharedState.externalCatalog

// Get the spec from the list of all partitions 
val partitions = catalog.listPartitions("default", "potato").map(_.spec)

// We pass them to the Catalog's dropPartitions function.
// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", partitions,
                   ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions

这对MANAGED桌子很有效,但是EXTERNAL桌子呢?

// We repeat the setup above but after creating an EXTERNAL table
// After dropping we see that the partitions appear to be gone (or are they?).
catalog.listPartitions("default", "potato").length // 0 partitions

// BUT repairing the table simply adds them again, the partitions/data 
// were NOT deleted from the underlying filesystem. This is not what we wanted!
spark.sql("MSCK REPAIR TABLE potato")
catalog.listPartitions("default", "potato").length // 9 partitions again!   

为此,我们在删除分区之前将表从EXTERNAL更改为 。MANAGED

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.CatalogTableType

// Identify the table in question
val identifier = TableIdentifier("potato", Some("default"))

// Get its current metadata
val tableMetadata = catalog.getTableMetadata(identifier)

// Clone the metadata while changing the tableType to MANAGED
val alteredMetadata = tableMetadata.copy(tableType = CatalogTableType.MANAGED)

// Alter the table using the new metadata
catalog.alterTable(alteredMetadata)

// Now drop!
catalog.dropPartitions("default", "potato", partitions,
                   ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 0 records
catalog.listPartitions("default", "potato").length // 0 partitions
spark.sql("MSCK REPAIR TABLE potato") // Won't add anything
catalog.listPartitions("default", "potato").length // Still 0 partitions!

不要忘记将表改回EXTERNALusing CatalogTableType.EXTERNAL


推荐阅读