首页 > 解决方案 > 倾斜的窗口函数和 Hive 源分区?

问题描述

我通过 Spark 读取的数据是高度倾斜的 Hive 表,具有以下统计信息。

(MIN、25TH、MEDIAN、75TH、MAX)通过 Spark UI:

1506.0 B / 0 232.4 KB / 27288 247.3 KB / 29025 371.0 KB / 42669 269.0 MB / 27197137

我相信当我执行一些Window Funcs, 和Pivots.

我尝试探索这个参数来限制分区大小,但是没有任何改变,并且分区在读取时仍然倾斜。

spark.conf.set("spark.sql.files.maxPartitionBytes")

此外,当我使用 Hive 表作为源缓存此 DF 时,它需要几分钟甚至在 Spark UI 中导致一些 GC,这很可能也是因为偏斜。

spark.sql.files.maxPartitionBytes适用于 Hive 表还是仅适用于文件?

处理这种倾斜的 Hive 源的最佳行动方案是什么?

像舞台屏障写入镶木地板或盐渍这样的东西是否适合这个问题?

我想避免.repartition()阅读,因为它为工作的已经数据过山车增加了另一层。

谢谢

====================================================

经过进一步研究,似乎这Window Function也导致了数据倾斜,这就是问题所在Spark Job

我正在time series通过双精度执行一些填充Window Function(向前然后向后填充以估算所有null传感器读数),并尝试按照本文尝试一种salt均匀分布的方法......但是以下代码会产生所有null值,因此该salt方法不起作用。

不知道为什么我要skews追求,Window因为我分区的每个度量项在通过检查后具有大致相同数量的记录.groupBy()......因此为什么salt需要?

+--------------------+-------+
|          measure   |  count|
+--------------------+-------+
|    v1              |5030265|
|      v2            |5009780|
|     v3             |5030526|
| v4                 |5030504|
...

盐帖 => https://medium.com/appsflyer/salting-your-spark-to-scale-e6f1c87dd18

nSaltBins = 300 # based off number of "measure" values
df_fill = df_fill.withColumn("salt", (F.rand() * nSaltBins).cast("int"))

# FILLS [FORWARD + BACKWARD]
window = Window.partitionBy('measure')\
               .orderBy('measure', 'date')\
               .rowsBetween(Window.unboundedPreceding, 0)

# FORWARD FILLING IMPUTER
ffill_imputer = F.last(df_fill['new_value'], ignorenulls=True)\
.over(window)
fill_measure_DF = df_fill.withColumn('value_impute_temp', ffill_imputer)\
.drop("value", "new_value")

window = Window.partitionBy('measure')\
               .orderBy('measure', 'date')\
               .rowsBetween(0,Window.unboundedFollowing)

# BACKWARD FILLING IMPUTER
bfill_imputer = F.first(df_fill['value_impute_temp'], ignorenulls=True)\
.over(window)
df_fill = df_fill.withColumn('value_impute_final', bfill_imputer)\
.drop("value_impute_temp")

标签: scalaapache-sparkhivepysparktime-series

解决方案


基于 Hive 的解决方案:

您可以使用配置单元配置启用倾斜连接优化。适用的设置是:

set hive.optimize.skewjoin=true;
set hive.skewjoin.key=500000;
set hive.skewjoin.mapjoin.map.tasks=10000;
set hive.skewjoin.mapjoin.min.split=33554432;

请参阅 databricks 提示:

在这种情况下,倾斜提示可能会起作用


推荐阅读