首页 > 解决方案 > 如何使用 withColumn 将额外的参数传递给 UDF

问题描述

如何使用 withColumn 将额外的参数传递给我的 UDF

df = spark.createDataFrame([
  ["aaa","1"],
  ["bbb","2"],
  ["ccc","5"]
]).toDF("text","id")

def title(x,y):
   if y:
      x = x.title()
   return x

title_udf = udf(lambda x: title(x,y), StringType())
spark.udf.register('title_udf', title_udf)

df = df.withColumn('text_title',title_udf('text',True)

当我尝试这个时,我得到一个错误:Invalid argument, not a string or column....

标签: pysparkuser-defined-functions

解决方案


udf 只能识别行元素。所以要传递一个固定的参数,你必须使用 lit() 函数。您的 udf 定义也必须更正。尝试这个:

import pyspark.sql.functions as F
from pyspark.sql.types import *
df = spark.createDataFrame([
  ["aaa","1"],
  ["bbb","2"],
  ["ccc","5"]
]).toDF("text","id")

def title(x,y):
   if y:
      x = x.title()
   return x

title_udf = F.udf(title, StringType())

df = df.withColumn('text_title',title_udf('text',F.lit(True)))

 df.show()
+----+---+----------+
|text| id|text_title|
+----+---+----------+
| aaa|  1|       Aaa|
| bbb|  2|       Bbb|
| ccc|  5|       Ccc|
+----+---+----------+

正如评论中的@powers 所示,如果此输出是您的最终目的,那么您可以使用 initcap() 函数在没有 udf 的情况下执行此操作

df = df.withColumn("text_title",F.when(F.lit(True),F.initcap(F.col('text'))).otherwise(F.col('text')))

您还可以使用其他列作为条件,例如“id”列

df = df.withColumn("text_title",F.when(F.col('id')>2,F.initcap(F.col('text'))).otherwise(F.col('text')))

推荐阅读