首页 > 解决方案 > Pyspark:pandas_udf、grouped_agg 的多个参数

问题描述

我正在尝试应用带有两个参数的 pandas_udf。但我有这个错误。首先我尝试使用一个参数,没关系:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession \
        .builder \
        .config('spark.cores.max', 100) \
        .getOrCreate()

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

这就是数据的样子

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+

我的 pandas_udf 函数是

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def count_udf(v):
    cond = v<=3
    res = v[cond].count()
    return res
df.groupby("id").agg(count_udf(df['v'])).show()

结果是

+---+------------+
| id|count_udf(v)|
+---+------------+
|  1|         2.0|
|  2|         1.0|
+---+------------+

但是当我尝试为 pandas_udf 函数设置两个参数时,出现错误。

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def count_udf2(v, value):
    cond = v<=value
    res = v[cond].count()
    return res

df.groupby("id").agg(count_udf(df['v'],4)).show()

错误:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 3267, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-18-468499490a1f>", line 1, in <module>
    res = df.groupby("id").agg(count_udf(df['v'],4))
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/udf.py", line 189, in wrapper
    return self(*args)
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/udf.py", line 169, in __call__
    return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 65, in _to_seq
    cols = [converter(c) for c in cols]
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 65, in <listcomp>
    cols = [converter(c) for c in cols]
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 53, in _to_java_column
    "function.".format(col, type(col)))
TypeError: Invalid argument, not a string or column: 4 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

标签: pythonpandaspysparkuser-defined-functions

解决方案


您可以在与调用函数相同的范围内定义 pandas_udf 函数。因此,所有局部变量都将在其中可见。

前任。:

def wrapper_count_udf():
  value = 4
  
  @pandas_udf("double", PandasUDFType.GROUPED_AGG)
  def count_udf(v):
    cond = v<=value
    res = v[cond].count()
    return res

  df.groupby("id").agg(count_udf(df['v'])).show()

推荐阅读