scala - 基于行值的条件连接过滤
问题描述
我有 2 个数据框需要连接在一起。我们将连接 2 列,但是,这 2 列不是 id,并且不会在另一个数据框中产生唯一的行值。例如,两个数据框将如下所示:
Dataframe 1
product_no dist code
040 wmn aj
040 wmn lm
040 wmn mn
Dataframe 2
p_no vendor code product
040 wmn ** y
040 wmn *j y
040 wmn mn n
2 列的查询如下所示:
df1.join(df2, $"product_no" === $"p_no" && $"dist" === $"vendor")
这会将所有行相互连接并具有重复项。但是,我们还想加入 bycode
例如,其中代码等于第二个数据帧中的实际代码。如果找不到实际的,则检查代码是否以 if 结尾,j
如果*j
没有任何匹配项,则 join by,然后 join by**
结果应如下所示:
**Outcome**
product_no dist code p_no vendor code product
040 wmn aj 040 wmn *j y
040 wmn lm 040 wmn ** y
040 wmn mn 040 wmn mn n
有没有办法我可以做到这一点?
解决方案
看一下这个:
scala> val df3 = df1.alias("t1").join(df2.alias("t2"),$"product_no" === $"p_no" && $"dist" === $"vendor").withColumn("match", when($"t1.code"===$"t2.code",lit(1)).when(regexp_extract($"t1.code",".*j",0)=!=lit("") && regexp_extract($"t2.code",".*j",0)=!=lit(""), 2).when(regexp_extract($"t1.code",".*[^j]$",0)=!=lit("") && regexp_extract($"t2.code","[*][*]",0)=!=lit(""), 3).otherwise(lit(0))).filter('match > 0).toDF("product_no","dist","code1","p_no","vendor","code2","product","match")
df3: org.apache.spark.sql.DataFrame = [product_no: string, dist: string ... 6 more fields]
scala> val df4= df3.withColumn("match2", collect_set('code2) over(Window.partitionBy('product_no,'dist).orderBy('match)))
df4: org.apache.spark.sql.DataFrame = [product_no: string, dist: string ... 7 more fields]
scala> df4.show
+----------+----+-----+----+------+-----+-------+-----+------------+
|product_no|dist|code1|p_no|vendor|code2|product|match| match2|
+----------+----+-----+----+------+-----+-------+-----+------------+
| 040| wmn| mn| 040| wmn| mn| n| 1| [mn]|
| 040| wmn| aj| 040| wmn| *j| y| 2| [*j, mn]|
| 040| wmn| mn| 040| wmn| **| y| 3|[*j, mn, **]|
| 040| wmn| lm| 040| wmn| **| y| 3|[*j, mn, **]|
+----------+----+-----+----+------+-----+-------+-----+------------+
scala> df4.selectExpr("*"," match in (1,2) or ( not array_contains(match2,code1) ) as match3 ").where('match3).show
+----------+----+-----+----+------+-----+-------+-----+------------+------+
|product_no|dist|code1|p_no|vendor|code2|product|match| match2|match3|
+----------+----+-----+----+------+-----+-------+-----+------------+------+
| 040| wmn| mn| 040| wmn| mn| n| 1| [mn]| true|
| 040| wmn| aj| 040| wmn| *j| y| 2| [*j, mn]| true|
| 040| wmn| lm| 040| wmn| **| y| 3|[*j, mn, **]| true|
+----------+----+-----+----+------+-----+-------+-----+------------+------+
scala>
推荐阅读
- angular - 如何获取导致角度 5 错误的组件名称
- amazon-web-services - Ubuntu 16.04 上的 awslogs 服务和 CloudWatch Logs 代理问题
- android - 处理 ViewModel 中的数据和 MVVM 中的 Fragment
- sql - 如果用户是管理员,则控制列的显示
- docker - Chromedriver 未在 docker 中启动
- mysql - 在 MySQL 中拆分逗号分隔值
- android - 无法从 Kotlin Android 中解析的 Json 数据中为 TextView 赋值
- r - 如何交换R中两列中的值?
- php - 使用 PHP 在 SQL Server 中下载二进制内容
- amazon-cloudwatch - 如何使用全局保留策略创建 CloudWatch LogGroup?