python - PySpark 查找一列中的模式是否存在于另一列中
问题描述
我有两个 pyspark 数据框。一个包含 FullAddress 字段(例如 col1),另一个数据框在其中一列(例如 col2)中包含城市/城镇/郊区的名称。我想将 col2 与 col1 进行比较,如果匹配则返回 col2。
此外,郊区名称可以是郊区名称列表。
包含完整地址的 Dataframe1
+--------+--------+----------------------------------------------------------+
|Postcode|District|City/ Town/ Suburb |
+--------+--------+----------------------------------------------------------+
|2000 |Sydney |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks |
|2001 |Sydney |Sydney |
|2113 |Sydney |North Ryde |
+--------+--------+----------------------------------------------------------+
+-----------------------------------------------------------+
|FullAddress |
+-----------------------------------------------------------+
|BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |
| HAY STREET HAYMARKET 2000, NSW, Australia |
| SMART STREET FAIRFIELD 2165, NSW, Australia |
|CLARENCE STREET SYDNEY 2000, NSW, Australia |
+-----------------------------------------------------------+
我想要这样的东西
+-----------------------------------------------------------++-----------+
|FullAddress |suburb |
+-----------------------------------------------------------++-----------+
|BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |NORTH RYDE |
| HAY STREET HAYMARKET 2000, NSW, Australia |HAYMARKET |
| SMART STREET FAIRFIELD 2165, NSW, Australia |NULL |
|CLARENCE STREET SYDNEY 2000, NSW, Australia |SYDNEY |
+-----------------------------------------------------------++-----------+
解决方案
有两个DataFrames
——
DataFrame 1: DataFrame
包含完整地址。
DataFrame 2: DataFrame
包含基础数据 - Postcode
, District
& City / Town / Suburb
。
问题的目的是从中提取适当suburb
的 for 。虽然 OP 没有明确指定我们可以加入两个 DataFrame 的位置,但似乎只是合理的选择。DataFrame 1
DataFrame 2
key
Postcode
# Importing requisite functions
from pyspark.sql.functions import col,regexp_extract,split,udf
from pyspark.sql.types import StringType
让我们创建DataFrame 1
as df
。在此DataFrame
我们需要提取Postcode
. 在澳大利亚,所有邮政编码都是4 位长,因此我们使用regexp_extract()从string
列中提取 4 位数字。
df = sqlContext.createDataFrame([('BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia ',),
('HAY STREET HAYMARKET 2000, NSW, Australia',),
('SMART STREET FAIRFIELD 2165, NSW, Australia',),
('CLARENCE STREET SYDNEY 2000, NSW, Australia',)],
('FullAddress',))
df = df.withColumn('Postcode', regexp_extract('FullAddress', "(\\d{4})" , 1 ))
df.show(truncate=False)
+---------------------------------------------+--------+
|FullAddress |Postcode|
+---------------------------------------------+--------+
|BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |2113 |
|HAY STREET HAYMARKET 2000, NSW, Australia |2000 |
|SMART STREET FAIRFIELD 2165, NSW, Australia |2165 |
|CLARENCE STREET SYDNEY 2000, NSW, Australia |2000 |
+---------------------------------------------+--------+
现在,我们已经提取了Postcode
,我们创建了key
来加入这两个DataFrames
。让我们创建DataFrame 2
,我们需要从中提取相应suburb
的 .
df_City_Town_Suburb = sqlContext.createDataFrame([(2000,'Sydney','Dawes Point, Haymarket, Millers Point, Sydney, The Rocks'),
(2001,'Sydney','Sydney'),(2113,'Sydney','North Ryde')],
('Postcode','District','City_Town_Suburb'))
df_City_Town_Suburb.show(truncate=False)
+--------+--------+--------------------------------------------------------+
|Postcode|District|City_Town_Suburb |
+--------+--------+--------------------------------------------------------+
|2000 |Sydney |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
|2001 |Sydney |Sydney |
|2113 |Sydney |North Ryde |
+--------+--------+--------------------------------------------------------+
DataFrames
通过join加入两者left
-
df = df.join(df_City_Town_Suburb.select('Postcode','City_Town_Suburb'), ['Postcode'],how='left')
df.show(truncate=False)
+--------+---------------------------------------------+--------------------------------------------------------+
|Postcode|FullAddress |City_Town_Suburb |
+--------+---------------------------------------------+--------------------------------------------------------+
|2113 |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |North Ryde |
|2165 |SMART STREET FAIRFIELD 2165, NSW, Australia |null |
|2000 |HAY STREET HAYMARKET 2000, NSW, Australia |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
|2000 |CLARENCE STREET SYDNEY 2000, NSW, Australia |Dawes Point, Haymarket, Millers Point, Sydney, The Rocks|
+--------+---------------------------------------------+--------------------------------------------------------+
使用split()函数将列拆分City_Town_Suburb
为数组-
df = df.select('Postcode','FullAddress',split(col("City_Town_Suburb"), ",\s*").alias("City_Town_Suburb"))
df.show(truncate=False)
+--------+---------------------------------------------+----------------------------------------------------------+
|Postcode|FullAddress |City_Town_Suburb |
+--------+---------------------------------------------+----------------------------------------------------------+
|2113 |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |[North Ryde] |
|2165 |SMART STREET FAIRFIELD 2165, NSW, Australia |null |
|2000 |HAY STREET HAYMARKET 2000, NSW, Australia |[Dawes Point, Haymarket, Millers Point, Sydney, The Rocks]|
|2000 |CLARENCE STREET SYDNEY 2000, NSW, Australia |[Dawes Point, Haymarket, Millers Point, Sydney, The Rocks]|
+--------+---------------------------------------------+----------------------------------------------------------+
最后创建一个UDF来检查数组的每个元素City_Town_Suburb
是否存在于列中FullAddress
。如果存在一个,我们立即返回,否则None
返回。
def suburb(FullAddress,City_Town_Suburb):
# Check for the case where there is no Array, otherwise we will get an Error
if City_Town_Suburb == None:
return None
# Checking each and every Array element if it exists in 'FullAddress',
# and if a match is found, it's immediately returned.
for sub in City_Town_Suburb:
if sub.strip().upper() in FullAddress:
return sub.upper()
return None
suburb_udf = udf(suburb,StringType())
应用这个UDF
-
df = df.withColumn('suburb', suburb_udf(col('FullAddress'),col('City_Town_Suburb'))).drop('City_Town_Suburb')
df.show(truncate=False)
+--------+---------------------------------------------+----------+
|Postcode|FullAddress |suburb |
+--------+---------------------------------------------+----------+
|2113 |BADAJOZ ROAD NORTH RYDE 2113, NSW, Australia |NORTH RYDE|
|2165 |SMART STREET FAIRFIELD 2165, NSW, Australia |null |
|2000 |HAY STREET HAYMARKET 2000, NSW, Australia |HAYMARKET |
|2000 |CLARENCE STREET SYDNEY 2000, NSW, Australia |SYDNEY |
+--------+---------------------------------------------+----------+
推荐阅读
- java - 未处理的异常:PlatformException (sign_in_failed, com.google.android.gms.common.api.ApiException: 10: , null, null) - 在一个系统中运行良好
- php - footer-home.inc.php 不在远程服务器上显示,而仅在本地主机上显示
- javascript - 嵌套的反应组件没有渲染被卡住了几个小时
- python - 如何从我的 __init__.py 修改其他文件中的变量?
- python - 对所有视图应用 django 身份验证
- git - Git 分支工作流
- java - Maven获取依赖的相对路径
- machine-learning - 使用 ML 查找相似项目
- google-chrome - Chrome DevTools 在左或右停靠时的默认宽度是多少?
- scala - 在 Spark 结构化流中应用消息级别而不是数据帧级别的模式