python - UDF 在 PySpark 中运行两次
问题描述
我有一个简单的 spark 数据框,它有两列,都是字符串;一个叫id
,另一个叫name
。我还有一个 Python 函数string_replacement
,它可以进行一些字符串操作。我已经定义了一个包装器 UDF,它包含string_replacement
并应用于数据帧的每一行。只有name
列被传递给字符串操作函数。这是代码
# Import libraries
from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark.sql.types import *
# Create Example Dataframe
row1 = Row(id='123456', name='Computer Science')
df = spark.createDataFrame([row1])
# Print the dataframe
df.show()
# Define function that does some string operations
def string_replacement(input_string):
string=input_string
string=string.replace('Computer', 'Computer x')
string=string.replace('Science', 'Science x')
return string
# Define wrapper function to turn into UFD
def wrapper_func(row):
temp=row[1]
temp=string_replacement(temp)
row[1]=temp
return row
# Create the schema for the resulting data frame
output_schema = StructType([StructField('id', StringType(), True),
StructField('name', StringType(), True)])
# UDF to apply the wrapper function to the dataframe
new_udf=f.udf(lambda z: wrapper_func(z), output_schema)
cols=df.columns
new_df=df.select(new_udf(f.array(cols)).alias('results')).select(f.col('results.*'))
new_df.show(truncate = False)
该函数获取单词Computer
并将其转换为Computer x
. 对 word 也是如此Science
。
原始数据框如下所示
+------+----------------+
| id| name|
+------+----------------+
|123456|Computer Science|
+------+----------------+
应用该功能后,它看起来像这样
+------+------------------------+
|id |name |
+------+------------------------+
|123456|Computer x x Science x x|
+------+------------------------+
从 s 可以看出x x
,它已经运行了两次该函数。第二次关于第一次运行的输出。我怎样才能避免这种行为?
有趣的是,如果我不分解生成的数据框,它看起来还不错:
new_df=df.select(new_udf(f.array(cols)).alias('results'))
给你
+-----------------------------+
|results |
+-----------------------------+
|[123456,Computer x Science x]|
+-----------------------------+
解决方案
使用星形展开似乎会导致为每个展开的元素运行一次 UDF,如此处所示。
df.select(new_udf(F.array(cols)).alias('results')).select(F.col('results.*')).explain()
# == Physical Plan ==
# *(1) Project [pythonUDF1#109.id AS id#104, pythonUDF1#109.name AS name#105]
# +- BatchEvalPython [<lambda>(array(id#0, name#1)), <lambda>(array(id#0, name#1))], [id#0, name#1, pythonUDF0#108, pythonUDF1#109]
# +- Scan ExistingRDD[id#0,name#1]
如果要保留当前的代码结构,可以通过将其包装在数组中并进行分解来解决问题。
df.select(F.explode(F.array(new_udf(F.array(cols)))).alias('results')).select(F.col('results.*')).show(truncate=False)
# +------+--------------------+
# |id |name |
# +------+--------------------+
# |123456|Computer x Science x|
# +------+--------------------+
根据您的用例,如果您可以以这种方式重新实现 UDF,即每行仅处理特定列而不是整行,则代码更具可读性。
def rep_str(string):
res = string
res = res.replace('Computer', 'Computer x')
res = res.replace('Science', 'Science x')
return res
rep_str_udf = F.udf(lambda s: rep_str(s), StringType())
df.withColumn('new_name', rep_str_udf(df.name)).show()
# +------+----------------+--------------------+
# | id| name| new_name|
# +------+----------------+--------------------+
# |123456|Computer Science|Computer x Science x|
# +------+----------------+--------------------+
推荐阅读
- arrays - Shell中的数组操作
- r - 使用“last”和“lag”时子集“维数不正确”中的错误
- perl - 如何为 firefox 添加 extra_capabilities 以使用 perl 使用 selenium 进行测试?
- c# - 从 MySQL 数据库中获取数百万行并写入 csv 的性能缓慢
- javascript - 覆盖 HTML DOM 元素方法 JS
- java - 在Android设备上按下后退按钮后吐司没有完成?
- prolog - 如何计算列表中列表的所有长度?
- windows - stdole2.tlb Windows 10 与早期版本 Windows 上的 stdole2.tlb
- android - 广播到房间在socket.io与android的每一秒连接上都有效
- javascript - 在 ReactJS 中使用“this”关键字调用函数