apache-spark - 从 Spark 中删除分区
问题描述
我正在使用 Java-Spark (Spark 2.2.0)。
我正在尝试按如下方式删除 Hive 分区:
spark.sql(""ALTER TABLE backup DROP PARTITION (date < '20180910')"
并得到以下异常:
org.apache.spark.sql.catalyst.parser.ParseException:不匹配的输入'<'期待{')',','}(第1行,位置42)
我知道这是未解决的问题ALTER TABLE DROP PARTITION 应该支持应该在我的版本中修复的比较器,但我仍然遇到异常。
从 Spark 中删除分区的替代方法是什么?还有另一种实现吗?
谢谢。
解决方案
似乎暂时没有办法做到这一点。如SPARK-14922所示,此修复的目标版本是 3.0.0,并且仍在进行中。
因此,我认为有两种可能的解决方法。
让我们使用 Spark 2.4.3 设置问题:
// We create the table
spark.sql("CREATE TABLE IF NOT EXISTS potato (size INT) PARTITIONED BY (hour STRING)")
// Enable dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition.mode","nonstrict")
// Insert some dummy records
(1 to 9).map(i => spark.sql(s"INSERT INTO potato VALUES ($i, '2020-06-07T0$i')"))
// Verify inserts
spark.table("potato").count // 9 records
现在...尝试从 spark 内部删除单个分区!
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour='2020-06-07T01')""")
spark.table("potato").count // 8 records
尝试删除多个分区不起作用。
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")""")
org.apache.spark.sql.catalyst.parser.ParseException:
Found duplicate keys 'hour'.(line 1, pos 34)
== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour="2020-06-07T02", hour="2020-06-07T03")
----------------------------------^^^
使用比较运算符删除一系列分区也不起作用。
spark.sql("""ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")""")
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input '<=' expecting {')', ','}(line 1, pos 49)
== SQL ==
ALTER TABLE potato DROP IF EXISTS PARTITION (hour<="2020-06-07T03")
-------------------------------------------------^^^
这应该是因为分区列是一个字符串,而我们正在使用比较运算符。
我找到的解决方案是:
- 获取分区列表并有条件地过滤它们。
- 要么一个一个地删除各个分区,要么将它们作为
[Map[String,String]
(TablePartitionSpec
) 的序列传递给目录的dropPartitions
函数。
步骤1:
// Get External Catalog
val catalog = spark.sharedState.externalCatalog
// Get the spec from the list of partitions
val partitions = catalog.listPartitions("default", "potato").map(_.spec)
// Filter according to the condition you attempted.
val filteredPartitions = partitions.flatten.filter(_._2 <= "2020-06-07T03")
.map(t => Map(t._1 -> t._2))
第2步:
我们将每个参数元组传递给单独的 ALTER TABLE DROP PARTITION 语句。
filteredPartitions.flatten.foreach(t =>
spark.sql(s"""ALTER TABLE potato DROP IF EXISTS PARTITION (${t._1}="${t._2}")"""))
spark.table("potato").count // 6 records
或者将它们传递给目录的dropPartition
函数。
// If you purge data, it gets deleted immediately and isn't moved to trash.
// This takes precedence over retainData, so even if you retainData but purge,
// your data is gone.
catalog.dropPartitions("default", "potato", filteredPartitions,
ignoreIfNotExists=true, purge=true, retainData=false)
spark.table("potato").count // 6 records
我希望这会有所帮助。如果您对 Spark 2.x 有更好的解决方案,请告诉我。
推荐阅读
- javafx-8 - 如何获取表格视图的一列的最大值和最小值
- json - 格式化 PSCustomObject 以转换为 JSON
- javascript - 这个冒泡排序功能有什么问题?
- python - 如何让这个程序打印出列表中列表的内容?
- sql - 获取不匹配的记录
- java - 将格式为 1/1/2010 3:23:12 PM +00:00 的字符串模式转换为 Java.util.Date
- unity3d - Sprite Mask 选项不出现
- python - 在 python Jupyter 中调试 asyncio websockets
- bixby - Bixby 胶囊无法在手机上启动
- r - 在 R data.table 中具有外部指定的四分位数断点的四分位数排序器