首页 > 解决方案 > 基于行值的条件连接过滤

问题描述

我有 2 个数据框需要连接在一起。我们将连接 2 列,但是,这 2 列不是 id,并且不会在另一个数据框中产生唯一的行值。例如,两个数据框将如下所示:

Dataframe 1 
    product_no   dist   code
    040          wmn    aj
    040          wmn    lm
    040          wmn    mn

Dataframe 2
    p_no   vendor    code   product
    040    wmn       **     y
    040    wmn       *j     y
    040    wmn       mn     n

2 列的查询如下所示:

df1.join(df2, $"product_no" === $"p_no" && $"dist" === $"vendor")

这会将所有行相互连接并具有重复项。但是,我们还想加入 bycode例如,其中代码等于第二个数据帧中的实际代码。如果找不到实际的,则检查代码是否以 if 结尾,j如果*j没有任何匹配项,则 join by,然后 join by**结果应如下所示:

**Outcome**
        product_no   dist   code  p_no   vendor    code   product
        040          wmn    aj    040    wmn       *j     y
        040          wmn    lm    040    wmn       **     y
        040          wmn    mn    040    wmn       mn     n

有没有办法我可以做到这一点?

标签: scalaapache-sparkjoin

解决方案


看一下这个:

scala> val df3 = df1.alias("t1").join(df2.alias("t2"),$"product_no" === $"p_no" && $"dist" === $"vendor").withColumn("match", when($"t1.code"===$"t2.code",lit(1)).when(regexp_extract($"t1.code",".*j",0)=!=lit("") && regexp_extract($"t2.code",".*j",0)=!=lit(""), 2).when(regexp_extract($"t1.code",".*[^j]$",0)=!=lit("") &&  regexp_extract($"t2.code","[*][*]",0)=!=lit(""), 3).otherwise(lit(0))).filter('match > 0).toDF("product_no","dist","code1","p_no","vendor","code2","product","match")
df3: org.apache.spark.sql.DataFrame = [product_no: string, dist: string ... 6 more fields]

scala> val df4= df3.withColumn("match2", collect_set('code2) over(Window.partitionBy('product_no,'dist).orderBy('match)))
df4: org.apache.spark.sql.DataFrame = [product_no: string, dist: string ... 7 more fields]

scala> df4.show
+----------+----+-----+----+------+-----+-------+-----+------------+
|product_no|dist|code1|p_no|vendor|code2|product|match|      match2|
+----------+----+-----+----+------+-----+-------+-----+------------+
|       040| wmn|   mn| 040|   wmn|   mn|      n|    1|        [mn]|
|       040| wmn|   aj| 040|   wmn|   *j|      y|    2|    [*j, mn]|
|       040| wmn|   mn| 040|   wmn|   **|      y|    3|[*j, mn, **]|
|       040| wmn|   lm| 040|   wmn|   **|      y|    3|[*j, mn, **]|
+----------+----+-----+----+------+-----+-------+-----+------------+

scala> df4.selectExpr("*"," match in (1,2) or ( not array_contains(match2,code1) ) as match3 ").where('match3).show
+----------+----+-----+----+------+-----+-------+-----+------------+------+
|product_no|dist|code1|p_no|vendor|code2|product|match|      match2|match3|
+----------+----+-----+----+------+-----+-------+-----+------------+------+
|       040| wmn|   mn| 040|   wmn|   mn|      n|    1|        [mn]|  true|
|       040| wmn|   aj| 040|   wmn|   *j|      y|    2|    [*j, mn]|  true|
|       040| wmn|   lm| 040|   wmn|   **|      y|    3|[*j, mn, **]|  true|
+----------+----+-----+----+------+-----+-------+-----+------------+------+


scala>

推荐阅读