首页 > 解决方案 > Spark Join 最佳匹配效率问题

问题描述

我有 2 个数据框:

我需要在其中两列df_1df_2完全匹配并在第三列上保持最佳匹配。最佳匹配是指从左到右存在一个:多的关系,但是我只想在长度方面在右侧接收最佳匹配。

例如

# df_1
col1    col2    col3
---------------------------
a       b       abcde
# df_2
col1    col2    col3    col4
-------------------------------
a       b       a       90
a       b       ab      100
a       b       abc     150
a       c       abc     90

col1因此,当我匹配并且col2完全匹配并且col3包含字符串的最佳匹配时,连接的期望结果是:

col1    col2    col3    col4
-------------------------------
a       b       abcde   150

这里有一些对我不利的观点:

虽然我已经完成了这项工作,但我的表现却很糟糕

我已经尝试了以下解决方案,但仍然没有得到任何地方:

使用 spark 进行此连接的最高效方法是什么?

标签: apache-sparkjoinpyspark

解决方案


更好的选择是在加入之前减少数据大小(我们不能根除加入)。我们可以减少如下:

一、加载数据

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> df1.show
+---+---+-----+
| c1| c2|   c3|
+---+---+-----+
|  a|  b|abcde|
|  c|  d|   fd|
+---+---+-----+

scala> df2.show
+---+---+----+---+
| c1| c2|  c3| c4|
+---+---+----+---+
|  a|  b|   a| 90|
|  a|  b| abd|100|
|  a|  b|abcd|150|
|  c|  d|wewe| 79|
+---+---+----+---+

现在我们需要在加入之前减少 df2 的大小(这将减少加入的时间,因为数据大小比较少)使用窗口函数并找出两列的最大值

scala> df2.withColumn("len", length($"c3")).withColumn("res", row_number().over(wind1)).filter($"res" === 1).withColumn("res2", row_number().over(wind2)).filter($"res2"=== 1).select("c1", "c2", "c3", "c4").show()
+---+---+----+---+
| c1| c2|  c3| c4|
+---+---+----+---+
|  c|  d|wewe| 79|
|  a|  b|abcd|150|
+---+---+----+---+

要尝试的事情:

1>您可以加入这个减少的数据框并应用您正在使用的逻辑

2>尝试做联合df1.withColumn("c4", lit(0)).union(df2),然后应用上述逻辑。

希望这可以帮助


推荐阅读