python - 类方法为 Pyspark udf
问题描述
我有以下代码
import numpy as np
import pandas as pd
class MyClass:
def __init__(self, a: pd.Series):
self.a = a
def f(self, b: pd.Series):
return np.exp(a) + b
我还有一个带有双列的 Pyspark 数据框a
和b
. 我想跑
df.withColumn('c', MyClass(df['a']).f(df['b']))
这当然失败了。我如何正确调整代码MyClass
以使其正常工作。(请注意,我不能简单地用f
Pyspark 函数来编写函数。
解决方案
您可以添加一个 UDF 来包装该类:
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
class MyClass:
def __init__(self, a: pd.Series):
self.a = a
def f(self, b: pd.Series):
return np.exp(self.a) + b
@F.pandas_udf('float')
def myClassUDF(a: pd.Series, b: pd.Series) -> pd.Series:
return MyClass(a).f(b)
df = spark.createDataFrame([[0,1], [0,2]],['a','b'])
df.withColumn('c', myClassUDF('a','b')).show()
+---+---+---+
| a| b| c|
+---+---+---+
| 0| 1|2.0|
| 0| 2|3.0|
+---+---+---+
推荐阅读
- python-3.x - 有没有办法给一个函数提供一个变量并从第二个函数检索一个不同的变量并在第一个函数中使用它
- json - 做jmeter随机函数会面临瓶颈的情况
- sas - SAS,尾随@
- javascript - 如何在 JavaScript 和 NodeJS WebSocket 之间进行 Ping/Pong?
- c# - 使用 HostedService [ASP.NET CORE 3.1.0 C#] 运行 SQL 脚本 (Transact-SQL)
- python - matlab和python之间数据转换的差异
- jquery - 当行中达到值时,Datatable jquery调用php
- python - 使用 python 和 OpenCV 计算图像上的单元格
- python - Reg FLASK 环境设置和虚拟环境设置
- java - 远程 JVM 的 Java Mission Control(飞行记录器)内存/活动对象视图空白