首页 > 解决方案 > 在PySpark中动态生成连接条件作为列表时,如何在元素之间应用“OR”而不是“AND”?

问题描述

我正在加入两个数据框 site_bs 和 site_wrk_int1 并使用动态连接条件创建 site_wrk。

我的代码如下:

join_cond=[ col(v_col) == col('wrk_'+v_col) for v_col in primaryKeyCols]  #result would be 
site_wrk=site_bs.join(site_wrk_int1,join_cond,'inner').select(*site_bs.columns)

join_cond 将是动态的,其值类似于 [ col(id) == col(wrk_id), col(id) == col(wrk_parentId)]

在上述连接条件中,连接将同时满足上述两个条件。即,加入条件将是

id = wrk_id  and id = wrk_parentId 

但我想或条件应用如下

id = wrk_id  or id = wrk_parentId 

如何在 Pyspark 中实现这一点?

标签: apache-sparkpysparkapache-spark-sql

解决方案


由于列上的逻辑操作pyspark返回列对象,因此您可以在连接语句中链接这些条件,例如:

from pyspark.sql import SparkSession
import pyspark.sql.functions as f


spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    (1, "A", "A"),
    (2, "C", "C"), 
    (3, "E", "D"), 
], ['id', 'col1', 'col2'] 
)
df.show()
+---+----+----+
| id|col1|col2|
+---+----+----+
|  1|   A|   A|
|  2|   C|   C|
|  3|   E|   D|
+---+----+----+


df.alias("t1").join(
    df.alias("t2"),
    (f.col("t1.col1") == f.col("t2.col2")) | (f.col("t1.col1") == f.lit("E")),
    "left_outer"
).show(truncate=False)
+---+----+----+---+----+----+
|id |col1|col2|id |col1|col2|
+---+----+----+---+----+----+
|1  |A   |A   |1  |A   |A   |
|2  |C   |C   |2  |C   |C   |
|3  |E   |D   |1  |A   |A   |
|3  |E   |D   |2  |C   |C   |
|3  |E   |D   |3  |E   |D   |
+---+----+----+---+----+----+

如您所见,我得到了TrueID 为 1 和 2 的左行的值,col1 == col2 OR col1 == ETrue与我的 DataFrame 的三行相同。在语法方面,重要的是 Python 运算符 ( | & ...) 用右括号分隔,如上例所示,否则您可能会遇到令人困惑的py4j错误。

或者,如果您希望保持与问题中所述类似的符号,为什么不使用functools.reduce并将operator.or_此逻辑应用于您的列表,例如:

在这个例子中,我的AND列条件和 get 之间有一个条件NULL,正如预期的那样:

df.alias("t1").join(
    df.alias("t2"),
    [f.col("t1.col1") == f.col("t2.col2"),  f.col("t1.col1") == f.lit("E")],
    "left_outer"
).show(truncate=False)
+---+----+----+----+----+----+
|id |col1|col2|id  |col1|col2|
+---+----+----+----+----+----+
|3  |E   |D   |null|null|null|
|1  |A   |A   |null|null|null|
|2  |C   |C   |null|null|null|
+---+----+----+----+----+----+

在此示例中,我利用functoolsoperator获得与上述相同的结果:

df.alias("t1").join(
    df.alias("t2"),
    functools.reduce(
      operator.or_, 
      [f.col("t1.col1") == f.col("t2.col2"),  f.col("t1.col1") == f.lit("E")]),
    "left_outer"
).show(truncate=False)
+---+----+----+---+----+----+
|id |col1|col2|id |col1|col2|
+---+----+----+---+----+----+
|1  |A   |A   |1  |A   |A   |
|2  |C   |C   |2  |C   |C   |
|3  |E   |D   |1  |A   |A   |
|3  |E   |D   |2  |C   |C   |
|3  |E   |D   |3  |E   |D   |
+---+----+----+---+----+----+

推荐阅读