首页 > 解决方案 > 性能调优 SparkSQL 查询(带反连接)

问题描述

我最近被介绍给Spark-SQL. 我编写了一个Spark-SQL运行了很长时间的查询,因此我需要对其进行调整以将其执行时间限制在可接受的范围内。

首先,该查询在表和目标Anti-Join表之间使用一个,以便丢弃 tableT 中任何已经存在的键记录,只考虑来自源的新键记录。只有来自 src tableA 的记录需要被拾取,这些记录还没有出现在 tableT 中。

首先,我很想知道是否可以重写此查询以减少执行时间。其次,如果通过加入 tableT 从源中获取新记录的逻辑可以以不同的方式重写以提高效率。

表A(来源):

id|name|Loc|cover_sk|dtid
10|John|CA|2346|3
20|Mark|MD|7459|5
30|Mike|MO|1345|6
40|Josh|CT|9898|2

tableT(上一次运行后的当前状态)

id|name|loc|cover|day_of_week
20|Mark|MD|1234|5
40|Josh|CT|6789|6

表C

cover_sk|cover_key|start_date
2346|EXT|2018-03-23
7459|AMB|2019-12-31
1345|DFE|2015-05-06
9898|RTE|2017-09-23
6189|EXT|2014|01-01

datedim(日期维度):

dateid|day_of_week
2|2
3|3
5|5
6|6
8|1
9|2

预期输出:

id|name|loc|cover|day_of_week
10|John|CA|1234|3
30|Mike|MO|6789|6

要求

  1. 从 src 中,选择 tableT 中不存在的新记录。在这种情况下,id 20 和 40 已经存在于 tableT 中,因此我们从源 (tableA) 中提取并继续处理剩余的记录 -id 即 10 和 30。从 tableA 中丢弃 20 和 40,因为它们已经存在于 tableT 中。
  2. 将 tableC 与上一步中选择的数据连接起来以获取cover_key。在这种情况下,选择了 2346 和 1345,因为在步骤 1 中我们选择了 ids 10 和 30 并忽略了 ids 20、40,因为它已经存在于 tableT 中
  3. 最后将上一步的结果集加入到 datedim 以获取 day_of_week。

我有以下查询:

SELECT
src.id, src.name, src.loc, c.cover_key, dt.day_of_week
FROM tableA src -- source table
LEFT ANTI JOIN tableT tgt -- Join with tableT to identify and discard existing keys
ON src.id = tgt.id
LEFT JOIN tableC c
ON src.cover_sk = c.cover_sk
INNER JOIN datedim dt
ON src.dateid = dt.dateid

表的大小如下:

tableA: 389,000,000 rec
tableT: 300,000,000 rec
tableC: 16,000 rec
datedim: 115 rec

整个查询连续运行了 2 个小时,我认为它有一些调整的基础。
另外,我尝试了普通的 LEFT JOIN 而不是 LEFT ANTI JOIN 来进行记录存在性检查(tableT 中的记录),但徒劳无功。
任何帮助表示赞赏。

注意:我只能访问Spark-SQL不能访问PySparkSQL

谢谢

标签: sqlapache-spark-sql

解决方案


我建议对 tableA 和 tableB 使用 id 列进行重新分区,至于分区的数量,它取决于使用的执行器/核心/内存的数量。此外,尝试广播 tableC 和 datedim 表。希望这可以帮助。


推荐阅读