python - 在 pandas udf pyspark 中使用 numpy
问题描述
我正在尝试定义一个 pandas udf 来计算每个周期的对数正态分布的偏斜。
我目前做了以下事情:
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def lognormal_skew(v):
return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)
my_df.groupBy('period').agg(lognormal_skew(my_df['my_columns'])).show()
但是我收到一个错误:
rg.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3047.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3047.0 (TID 208812, 10.139.64.8, executor 82): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
我的猜测是,如果我尝试按如下方式定义偏斜,这与numpy
因为:
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def skew(v):
return v.skew()
my_df.groupBy('period').agg(skew(my_df['my_columns'])).show()
它输出 aDataFrame
并且不会出错。
解决方案
前言
根据我的经验,我认为只要可以使用pyspark
内置函数来实现某些东西,它就比用户定义的函数更可取。
udf 的问题之一是错误消息难以解密。例如,在您的情况下,我不知道您为什么会遇到此错误。
pyspark.sql.functions
如果您接受在更多步骤中这样做,则允许您做很多事情。但是,就性能而言,这将很难被击败,因为这些功能是由专家优化的。如果您想做的事情无法完成pyspark.sql.functions
(发生这种情况),我更喜欢使用rdd
than udf
。rdd
为了应用Python
功能更自然。相对于内置DataFrame
方法,您失去了性能,但您获得了一些灵活性。
也许有关您的问题的示例可能具有启发性。
Python
让我们以基于 numpy 的示例为例。你给出了python
实现:
import numpy as np
def lognormal_skew_numpy(v):
return (np.exp(v.std()) + 2) * np.sqrt(np.exp(v.std()) - 1)
它可用于控制其他实现是否一致:
print(lognormal_skew_numpy(np.array([1,3,5])))
print(lognormal_skew_numpy(np.array([5,6])))
# 14.448897615797454
# 2.938798148174726
一个DataFrame API
逻辑
现在,让我们来看看Spark
。我将使用以下内容DataFrame
:
df = spark.createDataFrame([(1, 'a'), (3, 'a'), (5, 'a'), (5,'b'), (6,'b')], ['x','period'])
df.show(2)
+---+------+
| x|period|
+---+------+
| 1| a|
| 3| a|
+---+------+
only showing top 2 rows
偏度函数只执行基本的数学运算。它们都已实现,pyspark.sql.functions
因此在这种情况下创建一个执行此操作的函数并不难
import pyspark.sql.functions as psf
def lognormal_skew(df, xvar = 'x'):
df_agg = (df
.groupBy('period')
.agg(psf.stddev_pop(xvar).alias('sd'))
)
df_agg = df_agg.withColumn('skew', (psf.exp(psf.col('sd')) + 2)*psf.sqrt(psf.exp('sd') - 1))
return df_agg
请注意,计算标准差时存在不同的函数psf
:我使用stddev_pop
的函数效率较低,但报告的是总体水平方差,而不是估计量(如果有 3 或 2 个点,估计量的精度会很差)。
我们可以控制它产生所需的输出:
lognormal_skew(df).show(2)
+------+-----------------+------------------+
|period| sd| skew|
+------+-----------------+------------------+
| b| 0.5| 2.938798148174726|
| a|1.632993161855452|14.448897615797454|
+------+-----------------+------------------+
我们设法用纯DataFrame
逻辑得到了预期的结果。
rdd
让我们将数据排列成一个rdd
看起来像并行化的 numpy 数组:
rdd = df.rdd
rdd = rdd.mapValues(lambda l: l).map(lambda l: (l[1], [l[0]] )).reduceByKey(lambda x,y: x + y)
rdd.take(2)
[('b', [5, 6]), ('a', [1, 3, 5])]
在这里,我们使用reduceByKey
将值分组到一个列表中。在这一步,随着大量数据,您可能会使您的 RAM 爆炸。
最后,您可以轻松地将函数与该结构并行:
rdd = rdd.map(lambda l: (l[0], np.array(l[1]))).map(lambda l: (l[0], lognormal_skew_numpy(l[1])))
rdd.take(2)
[('b', 2.938798148174726), ('a', 14.448897615797454)]
我们再次得到同样的结果。我发现这种方法有两个缺陷:
- 它的可读性和便携性较差。如果你想用不同的数据集重用代码,你将不得不做更多的工作
- 它的效率较低(速度和内存)。这里的
reduceByKey
操作是主要瓶颈。
但是,您获得了一些灵活性。这是一个权衡。
推荐阅读
- c# - 是否有适用于 Windows C# 的 API 允许打印的文本永远留在特定位置?
- php - 通过 Vimeo API 上传水印以覆盖视频
- c# - 如何保存难度级别并在控制台应用程序中下次启动时使用它
- javascript - 我应该如何删除 Firebase 存储文件?
- javascript - 显示复数作为答案
- firebase - firebase auth 是否可以限制某些用户的登录?
- html - 字符串中的链接在 html 代码中不启用
- python - 将文件记录到数据框中
- linq - 如何使用 LLBLGen Pro 将两个字符串连接成一个
- python - 使用 Flask 的 Python 网页中不允许使用“POST”方法