首页 > 解决方案 > 数据框上的 Pyspark UDF 列

问题描述

我正在尝试根据某些列的值在数据框上创建一个新列。它在所有情况下都返回 null。有人知道这个简单的例子出了什么问题吗?

df = pd.DataFrame([[0,1,0],[1,0,0],[1,1,1]],columns = ['Foo','Bar','Baz'])

spark_df = spark.createDataFrame(df)

def get_profile():
    if 'Foo'==1:
        return 'Foo'
    elif 'Bar' == 1:
        return 'Bar'
    elif 'Baz' ==1 :
        return 'Baz'

spark_df = spark_df.withColumn('get_profile', lit(get_profile()))
spark_df.show()

   Foo  Bar  Baz get_profile
    0    1    0        None
    1    0    0        None
    1    1    1        None

我希望所有行都会填写 get_profile 列。

我也试过这个:

spark_udf = udf(get_profile,StringType())

spark_df = spark_df.withColumn('get_profile', spark_udf())
print(spark_df.toPandas())

达到同样的效果。

标签: pythonapache-sparkpyspark

解决方案


udf不知道列名是什么。因此,它会检查if/elif块中的每个条件,并且所有条件都评估为False. 因此该函数将返回None

您必须重写您udf要检查的列:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def get_profile(foo, bar, baz):
    if foo == 1:
        return 'Foo'
    elif bar == 1:
        return 'Bar'
    elif baz == 1 :
        return 'Baz'

spark_udf = udf(get_profile, StringType())
spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))
spark_df.show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#|  0|  1|  0|        Bar|
#|  1|  0|  0|        Foo|
#|  1|  1|  1|        Foo|
#+---+---+---+-----------+

如果您有很多列并且想要全部传递它们(按顺序):

spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))

更一般地说,您可以解压缩任何有序的列列表:

cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']
spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))

但是这个特定的操作不需要udf. 我会这样做:

from pyspark.sql.functions import coalesce, when, col, lit

spark_df.withColumn(
    "get_profile",
    coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])
).show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#|  0|  1|  0|        Bar|
#|  1|  0|  0|        Foo|
#|  1|  1|  1|        Foo|
#+---+---+---+-----------+

这是有效的,因为如果条件评估为并且没有指定,默认情况下pyspark.sql.functions.when()将返回。然后列表推导将返回第一个非空列。nullFalseotherwisepyspark.sql.functions.coalesce

udf请注意,如果列的顺序与函数中评估的序列相同,则这等效于ONLY get_profile。更明确地说,你应该这样做:

spark_df.withColumn(
    "get_profile",
    coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])
).show()

推荐阅读