首页 > 解决方案 > 从 Spark 中删除分区

问题描述

我正在使用 Java-Spark (Spark 2.2.0)。

我正在尝试按如下方式删除 Hive 分区:

spark.sql(""ALTER TABLE backup DROP PARTITION (date < '20180910')"

并得到以下异常:

org.apache.spark.sql.catalyst.parser.ParseException:不匹配的输入'<'期待{')',','}(第1行,位置42)

我知道这是未解决的问题ALTER TABLE DROP PARTITION 应该支持应该在我的版本中修复的比较器,但我仍然遇到异常。

从 Spark 中删除分区的替代方法是什么?还有另一种实现吗?

谢谢。

标签: apache-sparkhive

解决方案


似乎暂时没有办法做到这一点。如SPARK-14922所示,此修复的目标版本是 3.0.0,并且仍在进行中。

因此,我认为有两种可能的解决方法。

让我们使用 Spark 2.4.3 设置问题:

// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")

// Enable dynamic partitioning 
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")

// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))

// Verify inserts
spark.table("potato").count // 9 records

现在...尝试从 spark 内部删除单个分区!

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour='2020-06-07T01')""")
spark.table("potato").count // 8 records

尝试删除多个分区不起作用。

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")""")

org.apache.spark.sql.catalyst.parser.ParseException:
Found duplicate keys 'hour'.(line 1, pos 34)

== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")
----------------------------------^^^

使用比较运算符删除一系列分区也不起作用。

spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")""")

org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<=' expecting {')', ','}(line 1, pos 49)

== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")
-------------------------------------------------^^^

这应该是因为分区列是一个字符串,而我们正在使用比较运算符。

我找到的解决方案是:

  1. 获取分区列表并有条件地过滤它们。
  2. 要么一个一个地删除各个分区,要么将它们作为[Map[String,String]( TablePartitionSpec) 的序列传递给目录的dropPartitions函数。

步骤1:

// Get External Catalog
val catalog = spark.sharedState.externalCatalog

// Get the spec from the list of partitions 
val partitions = catalog.listPartitions("default", "potato").map(_.spec)

// Filter according to the condition you attempted.
val filteredPartitions = partitions.flatten.filter(_._2 <= "2020-06-07T03")
                                           .map(t => Map(t._1 -> t._2))

第2步:

我们将每个参数元组传递给单独的 ALTER TABLE DROP PARTITION 语句。

filteredPartitions.flatten.foreach(t => 
     spark.sql(s"""ALTER TABLE potato DROP IF EXISTS PARTITION (${t._1}="${t._2}")"""))
spark.table("potato").count // 6 records

或者将它们传递给目录的dropPartition函数。

// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", filteredPartitions,
                       ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 6 records

我希望这会有所帮助。如果您对 Spark 2.x 有更好的解决方案,请告诉我。


推荐阅读