首页 > 解决方案 > PySpark 模式匹配和分配关联值

问题描述

df1

campaign_name   campaign_team
einsurancep09   other
estoreemicardcdwpnov06  other
estoreemicardwmnov06    other
estoreemicardgenericspnov06 other

df2

terms   product_category    product
insurance   insurance   null
def emi store
ab  bhi asd
de  lic cards
a   credit  cards

以下是我的场景:

  1. ' terms' 列(df2)按字符串长度降序排列。
  2. 它应该与campaign_namedf1 的 进行比较contains/like
  3. 无论哪个terms字符串matches first带有campaign_name,它的 product_category 和 product 都应该被选中,并且应该作为新列添加到 df1。
  4. 对于campaign_name值“ einsurancep09”,“ insurance”值来自terms包含在campaign_name 中,因此它的product_category 和产品被拾取并作为df1 添加到输出中。
  5. 另一个例如:考虑其余 3 条记录 where you contain def, abanddecampaign_name字符串中,但我们选择 product_category 和 product 的 " def" 作为它appeared first,并且是longest in the length与 "ab" 和 "de" 比较时

下面是我的代码:

df1 = df1.withColumn("product_category",when(df1.campaign_name.contains(df2.terms),df2.product_category).otherwise('other'))

但是,它给了我以下错误:

   raise converted from None
pyspark.sql.utils.AnalysisException: Resolved attribute(s) terms#37,product_category#38 missing from campaign_name#16,campaign_team#17 in operator !Project [campaign_name#16, campaign_team#17, CASE WHEN Contains(campaign_name#16, terms#37) THEN product_category#38 ELSE other END AS product_category#44].;
!Project [campaign_name#16, campaign_team#17, CASE WHEN Contains(campaign_name#16, terms#37) THEN product_category#38 ELSE other END AS product_category#44]
+- Relation[campaign_name#16,campaign_team#17] csv

那么我哪里错了?

根据堆栈的回答,我得到以下输出:

+---------------+-------------+---------+----------------+-------+
|campaign_name  |campaign_team|terms    |product_category|product|
+---------------+-------------+---------+----------------+-------+
|einsurancepnm06|other        |insurance|Insurance       |NaN    |
+---------------+-------------+---------+----------------+-------+

预期输出:
在此处输入图像描述

标签: pythonpandasapache-sparkpysparkapache-spark-sql

解决方案


假设

数据集 df1 应该有一个满足 OP 要求的顺序。所以我介绍了rec_no列

df = spark.sql("""
select 'abcdefcdwpnovo6' campaign_name, 'other' campaign_team union all
select 'abcdefdwpnovo6' , 'other' union all
select 'abcdefgenericpnovo6' , 'other' 
""")
df.createOrReplaceTempView("df")
df.show()

+-------------------+-------------+
|      campaign_name|campaign_team|
+-------------------+-------------+
|    abcdefcdwpnovo6|        other|
|     abcdefdwpnovo6|        other|
|abcdefgenericpnovo6|        other|
+-------------------+-------------+

df1 = spark.sql("""
select 1 rec_no, 'def' terms, 'emi' product_category, 'store' product union all 
select 2, 'abc' ,'bhi' ,'asd' union all
select 3, 'de' ,'lic' ,'cards' union all
select 4, 'a' ,'credit' ,'cards' 
""")
df1.createOrReplaceTempView("df1")
df1.show()

+------+-----+----------------+-------+
|rec_no|terms|product_category|product|
+------+-----+----------------+-------+
|     1|  def|             emi|  store|
|     2|  abc|             bhi|    asd|
|     3|   de|             lic|  cards|
|     4|    a|          credit|  cards|
+------+-----+----------------+-------+

输出:

您可以删除 ps、rk 和 rec_no 列

spark.sql("""
with t1 ( select * from df a cross join df1 b ),
     t2 ( select rec_no, campaign_name,campaign_team,terms,product_category,product x, position(terms,campaign_name) ps,
    rank() over(order by rec_no) rk from t1  where position(terms,campaign_name)>0 )
    select * from t2 where rk=1
""").show()

+------+-------------------+-------------+-----+----------------+-----+---+---+
|rec_no|      campaign_name|campaign_team|terms|product_category|    x| ps| rk|
+------+-------------------+-------------+-----+----------------+-----+---+---+
|     1|    abcdefcdwpnovo6|        other|  def|             emi|store|  4|  1|
|     1|     abcdefdwpnovo6|        other|  def|             emi|store|  4|  1|
|     1|abcdefgenericpnovo6|        other|  def|             emi|store|  4|  1|
+------+-------------------+-------------+-----+----------------+-----+---+---+

更新 1:

OP的问题仍然不清楚。下面试试。

spark.sql("""
with t1 ( select * from df a cross join df1 b ),
     t2 ( select rec_no, campaign_name,campaign_team,terms,product_category,product, position(product_category,campaign_name) ps,
    rank() over(partition by product_category order by rec_no) rk from t1 where position(product_category,campaign_name)>0 
    )
    select * from t2 where rk=1 order by rec_no
""").show(truncate=False)

+------+---------------------------+-------------+---------+----------------+-------+---+---+
|rec_no|campaign_name              |campaign_team|terms    |product_category|product|ps |rk |
+------+---------------------------+-------------+---------+----------------+-------+---+---+
|1     |einsurancep09              |other        |insurance|insurance       |null   |2  |1  |
|2     |estoreemicardcdwpnov06     |other        |def      |emi             |store  |7  |1  |
|2     |estoreemicardgenericspnov06|other        |def      |emi             |store  |7  |1  |
|2     |estoreemicardwmnov06       |other        |def      |emi             |store  |7  |1  |
+------+---------------------------+-------------+---------+----------------+-------+---+---+

推荐阅读