首页 > 解决方案 > Spark 中针对存储在 Hive 表中的图的长线性查询

问题描述

假设我有一个图 G 和以下查询:

     x     y     z     w     q     r    s
(?a)--(?b)--(?c)--(?d)--(?e)--(?f)--(?g)--(?h)

其中{?a, ?b, ?c, ..., ?h}是变量,{x, y, z, w, q, r, s}是弧标签。

在存储级别,每个标签都有一个表格,但也有两个标签的组合。例如,我可能有一个带有列|a|b|的表x ,但我也有一个带有列|a|b|c|的表xy . 是的,我有多余的桌子。

基于此设置,我有两个问题:

a)我需要找到表,以便它们之间的连接导致最佳执行时间(最小)。让 {xy zw, q, rs} 成为上面示例的那些表。

b) 我必须按照给定的顺序执行连接,所以我需要找到那个顺序,例如:(rs ⨝ q) ⨝ (zw ⨝ xy)(⨝ 是自然连接)。

假设我知道要使用哪些表,即我已经解决了 a),我的问题是如何处理第二个表。Spark API 允许我在一行中执行所有连接:

val res1 = xy.join(zw, Seq("c")).join(q, Seq("e")).join(rs, Seq("f"))

但我也可以分几行执行它:

val tmp1 = xy.join(zw, Seq("c"))

val tmp2 = q.join(rs, Seq("f"))

val res2 = tmp1.join(tmp2, Seq("e"))

res1.count 和 res2.count 的执行时间(几次运行的平均值)在我的实验中是不同的。树的构建方式似乎对执行有影响。

1) 我可以使用哪种策略来构建一棵树,从而在 Spark 中实现最佳执行时间?

2)如果每棵不同的树似乎会导致不同的性能,那么查询优化器的作用是什么?加入排序。它似乎什么也没做,尤其是在我将所有连接都放在一行代码中的情况下:

val res1 = xy.join(zw, Seq("c")).join(q, Seq("e")).join(rs, Seq("f"))

val res3 = rs.join(q, Seq("f")).join(zw, Seq("e")).join(xy, Seq("c"))

在一种情况下,我可以有一个合理的执行时间。在另一个超时。Catalyst 没有做任何事情吗?

标签: scalaapache-sparkgraphapache-spark-sqlcatalyst-optimizer

解决方案


Spark API 允许我在一行中执行所有连接:

但我也可以分几行执行它:

不正确。此时没有执行,但只有在您执行操作时才会执行。您展示的是使用 Scala 中创建相同查询计划的高级运算符编写相同计算图的不同方法。

1) 我可以使用哪种策略来构建一棵树,从而在 Spark 中实现最佳执行时间?

这就是所谓的 Catalyst Optimizer(不是您)的目的。您可能想探索负责确保最佳连接性能的CostBasedJoinReorder逻辑优化和JoinSelection执行计划策略。

SparkPlanner使用 JoinSelection 执行计划策略将Join逻辑运算符计划到支持的连接物理运算符之一。

CostBasedJoinReorder是一种逻辑优化,用于在基于成本的优化中重新排序连接。

如果表的大小很重要,请考虑基于成本的优化 (CBO)。你应该看到不同。您必须使用表(不是任何关系)并执行ANALYZE TABLE COMPUTE STATISTICS统计命令。

Catalyst 没有做任何事情吗?

它应该优化连接。解释查询计划以获取更多详细信息。


推荐阅读