scala - 倾斜的窗口函数和 Hive 源分区?
问题描述
我通过 Spark 读取的数据是高度倾斜的 Hive 表,具有以下统计信息。
(MIN、25TH、MEDIAN、75TH、MAX)通过 Spark UI:
1506.0 B / 0 232.4 KB / 27288 247.3 KB / 29025 371.0 KB / 42669 269.0 MB / 27197137
我相信当我执行一些Window Funcs
, 和Pivots
.
我尝试探索这个参数来限制分区大小,但是没有任何改变,并且分区在读取时仍然倾斜。
spark.conf.set("spark.sql.files.maxPartitionBytes")
此外,当我使用 Hive 表作为源缓存此 DF 时,它需要几分钟甚至在 Spark UI 中导致一些 GC,这很可能也是因为偏斜。
这spark.sql.files.maxPartitionBytes
适用于 Hive 表还是仅适用于文件?
处理这种倾斜的 Hive 源的最佳行动方案是什么?
像舞台屏障写入镶木地板或盐渍这样的东西是否适合这个问题?
我想避免.repartition()
阅读,因为它为工作的已经数据过山车增加了另一层。
谢谢
====================================================
经过进一步研究,似乎这Window Function
也导致了数据倾斜,这就是问题所在Spark Job
。
我正在time series
通过双精度执行一些填充Window Function
(向前然后向后填充以估算所有null
传感器读数),并尝试按照本文尝试一种salt
均匀分布的方法......但是以下代码会产生所有null
值,因此该salt
方法不起作用。
不知道为什么我要skews
追求,Window
因为我分区的每个度量项在通过检查后具有大致相同数量的记录.groupBy()
......因此为什么salt
需要?
+--------------------+-------+
| measure | count|
+--------------------+-------+
| v1 |5030265|
| v2 |5009780|
| v3 |5030526|
| v4 |5030504|
...
盐帖 => https://medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18
nSaltBins = 300 # based off number of "measure" values
df_fill = df_fill.withColumn("salt", (F.rand() * nSaltBins).cast("int"))
# FILLS [FORWARD + BACKWARD]
window = Window.partitionBy('measure')\
.orderBy('measure', 'date')\
.rowsBetween(Window.unboundedPreceding, 0)
# FORWARD FILLING IMPUTER
ffill_imputer = F.last(df_fill['new_value'], ignorenulls=True)\
.over(window)
fill_measure_DF = df_fill.withColumn('value_impute_temp', ffill_imputer)\
.drop("value", "new_value")
window = Window.partitionBy('measure')\
.orderBy('measure', 'date')\
.rowsBetween(0,Window.unboundedFollowing)
# BACKWARD FILLING IMPUTER
bfill_imputer = F.first(df_fill['value_impute_temp'], ignorenulls=True)\
.over(window)
df_fill = df_fill.withColumn('value_impute_final', bfill_imputer)\
.drop("value_impute_temp")
解决方案
您可以使用配置单元配置启用倾斜连接优化。适用的设置是:
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=500000;
set hive.skewjoin.mapjoin.map.tasks=10000;
set hive.skewjoin.mapjoin.min.split=33554432;
请参阅 databricks 提示:
在这种情况下,倾斜提示可能会起作用
推荐阅读
- macos - macCatalyst 项目的最小 macOS 部署目标?
- python-3.x - 从 Python 写入 Jupyter Notebook
- java - 尝试将重复记录插入数据库时出现 sql 异常
- mysql - 语法错误之类的问题 sql
- reactjs - 无效的挂钩调用
- java - “没有已知文件:classes2.dex”是什么意思?
- makefile - 具有动态先决条件的动态 Makefile 目标
- python - Pyspark - 如何在组中聚合超过 4 小时的窗口
- function - 在powershell中将变量从一个函数返回到另一个函数
- selenium - Selenium 无法调用 Edge 浏览器版本 17763