首页 > 解决方案 > Pyspark SQL Pandas UDF:返回一个数组

问题描述

我正在尝试制作一个 pandas UDF,它包含两列具有整数值的列,并根据这些值之间的差异返回一个长度等于上述差异的小数数组。

到目前为止,这是我的尝试,我一直在用很多不同的方法来尝试让它发挥作用,但这是一般的想法

import pandas as pd

@pandas_udf(ArrayType(DecimalType()), PandasUDFType.SCALAR)
def zero_pad(x, y):
  buffer = []

  for i in range(0, (x - y)):
    buffer.append(0.0)

  return buffer #correction provided by Ali Yessili

这是我如何使用它的示例

df = df.withColumn("zero_list", zero_pad(df.x, df.y))

最终结果是df一个名为 ArrayType(DecimalType()) 列的新列,其长度zero_list看起来像(df.x - df.y)[0.0, 0.0, 0.0, ...]

错误消息太笼统了,几乎不值得发布,只是“由于阶段失败而中止作业”,它只追溯到我执行的代码部分df.show()

Py4JJavaError                             Traceback (most recent call last)
<command-103561> in <module>()
---> 33 df.orderBy("z").show(n=1000)

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    350         """
    351         if isinstance(truncate, bool) and truncate:
--> 352             print(self._jdf.showString(n, 20, vertical))
    353         else:
    354             print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

我希望有人能指出我正确的方向来制作一个 pandas udf,它会返回一个可变长度的数组,或者只是告诉我为什么我的代码或方法是错误的。

我正在使用带有 spark 2.3.1 的数据块来完成所有这些工作。

标签: pythonpandaspysparkpyspark-sqldatabricks

解决方案


这个问题是大约一年前的问题,但我遇到了同样的问题,这是我的解决方案pandas_udf

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

@pandas_udf(ArrayType(IntegerType()), PandasUDFType.SCALAR)
def zero_pad(xs,ys):
    buffer = []
    for idx, x in enumerate(xs):
        buffer.append([0]*int(x-ys[idx]))

    return pd.Series(buffer)

df = df.withColumn("zero_list", zero_pad(df.x, df.y))

推荐阅读