python - PySpark 模式匹配和分配关联值
问题描述
df1:
campaign_name campaign_team
einsurancep09 other
estoreemicardcdwpnov06 other
estoreemicardwmnov06 other
estoreemicardgenericspnov06 other
df2:
terms product_category product
insurance insurance null
def emi store
ab bhi asd
de lic cards
a credit cards
以下是我的场景:
- '
terms
' 列(df2)按字符串长度降序排列。 - 它应该与
campaign_name
df1 的 进行比较contains/like
。 - 无论哪个
terms
字符串matches first
带有campaign_name
,它的 product_category 和 product 都应该被选中,并且应该作为新列添加到 df1。 - 对于
campaign_name
值“einsurancep09
”,“insurance
”值来自terms
包含在campaign_name 中,因此它的product_category 和产品被拾取并作为df1 添加到输出中。 - 另一个例如:考虑其余 3 条记录 where you
contain
def
,ab
andde
在campaign_name
字符串中,但我们选择 product_category 和 product 的 "def
" 作为它appeared first
,并且是longest in the length
与 "ab" 和 "de" 比较时
下面是我的代码:
df1 = df1.withColumn("product_category",when(df1.campaign_name.contains(df2.terms),df2.product_category).otherwise('other'))
但是,它给了我以下错误:
raise converted from None
pyspark.sql.utils.AnalysisException: Resolved attribute(s) terms#37,product_category#38 missing from campaign_name#16,campaign_team#17 in operator !Project [campaign_name#16, campaign_team#17, CASE WHEN Contains(campaign_name#16, terms#37) THEN product_category#38 ELSE other END AS product_category#44].;
!Project [campaign_name#16, campaign_team#17, CASE WHEN Contains(campaign_name#16, terms#37) THEN product_category#38 ELSE other END AS product_category#44]
+- Relation[campaign_name#16,campaign_team#17] csv
那么我哪里错了?
根据堆栈的回答,我得到以下输出:
+---------------+-------------+---------+----------------+-------+
|campaign_name |campaign_team|terms |product_category|product|
+---------------+-------------+---------+----------------+-------+
|einsurancepnm06|other |insurance|Insurance |NaN |
+---------------+-------------+---------+----------------+-------+
解决方案
假设
数据集 df1 应该有一个满足 OP 要求的顺序。所以我介绍了rec_no列
df = spark.sql("""
select 'abcdefcdwpnovo6' campaign_name, 'other' campaign_team union all
select 'abcdefdwpnovo6' , 'other' union all
select 'abcdefgenericpnovo6' , 'other'
""")
df.createOrReplaceTempView("df")
df.show()
+-------------------+-------------+
| campaign_name|campaign_team|
+-------------------+-------------+
| abcdefcdwpnovo6| other|
| abcdefdwpnovo6| other|
|abcdefgenericpnovo6| other|
+-------------------+-------------+
df1 = spark.sql("""
select 1 rec_no, 'def' terms, 'emi' product_category, 'store' product union all
select 2, 'abc' ,'bhi' ,'asd' union all
select 3, 'de' ,'lic' ,'cards' union all
select 4, 'a' ,'credit' ,'cards'
""")
df1.createOrReplaceTempView("df1")
df1.show()
+------+-----+----------------+-------+
|rec_no|terms|product_category|product|
+------+-----+----------------+-------+
| 1| def| emi| store|
| 2| abc| bhi| asd|
| 3| de| lic| cards|
| 4| a| credit| cards|
+------+-----+----------------+-------+
输出:
您可以删除 ps、rk 和 rec_no 列
spark.sql("""
with t1 ( select * from df a cross join df1 b ),
t2 ( select rec_no, campaign_name,campaign_team,terms,product_category,product x, position(terms,campaign_name) ps,
rank() over(order by rec_no) rk from t1 where position(terms,campaign_name)>0 )
select * from t2 where rk=1
""").show()
+------+-------------------+-------------+-----+----------------+-----+---+---+
|rec_no| campaign_name|campaign_team|terms|product_category| x| ps| rk|
+------+-------------------+-------------+-----+----------------+-----+---+---+
| 1| abcdefcdwpnovo6| other| def| emi|store| 4| 1|
| 1| abcdefdwpnovo6| other| def| emi|store| 4| 1|
| 1|abcdefgenericpnovo6| other| def| emi|store| 4| 1|
+------+-------------------+-------------+-----+----------------+-----+---+---+
更新 1:
OP的问题仍然不清楚。下面试试。
spark.sql("""
with t1 ( select * from df a cross join df1 b ),
t2 ( select rec_no, campaign_name,campaign_team,terms,product_category,product, position(product_category,campaign_name) ps,
rank() over(partition by product_category order by rec_no) rk from t1 where position(product_category,campaign_name)>0
)
select * from t2 where rk=1 order by rec_no
""").show(truncate=False)
+------+---------------------------+-------------+---------+----------------+-------+---+---+
|rec_no|campaign_name |campaign_team|terms |product_category|product|ps |rk |
+------+---------------------------+-------------+---------+----------------+-------+---+---+
|1 |einsurancep09 |other |insurance|insurance |null |2 |1 |
|2 |estoreemicardcdwpnov06 |other |def |emi |store |7 |1 |
|2 |estoreemicardgenericspnov06|other |def |emi |store |7 |1 |
|2 |estoreemicardwmnov06 |other |def |emi |store |7 |1 |
+------+---------------------------+-------------+---------+----------------+-------+---+---+
推荐阅读
- java - 如何使用 Swagger Java 注释或代码生成附加属性:{}?
- reactjs - ./src/routeCollector/Page/MainPage/MainPage.tsx 未找到模块:无法解析“./common/HeaderBackground.svg”
- python - 对 Lambda 函数变量感到困惑
- lightgallery - 有没有办法将多个 lightGalleries 合并为一个?
- java - 由于 java.lang.NullPointerException,当我尝试从一个活动移动到另一个活动时,我的应用程序崩溃了
- javascript - 返回值但未出现在选择上
- sql - 800 万条数据的 Sql 索引不起作用问题
- video.js - 使用 nuevojs 在 videojs hls 流中以编程方式更改分辨率
- python-3.x - 为什么原始电子邮件中的表情符号是十六进制的以及如何在 Python 中对其进行解码
- python - 使用 groupby + lambda 自定义函数更新数据帧的每一行。应用和转换似乎不起作用