首页 > 解决方案 > 对 pyspark 数据框的多列应用不同的函数

问题描述

我有一个包含几列的 pyspark 数据框

col1    col2    col3
---------------------
1.      2.1.    3.2
3.2.    4.2.    5.1

我想将三个函数f1(x), f2(x), f3(x)分别应用于数据框的对应列,以便我得到

col1        col2        col3
-------------------------------
f1(1.)      f2(2.1.)    f3(3.2)
f1(3.2.)    f2(4.2.)    f3(5.1)

我试图避免为每一列定义一个 udf,所以我的想法是从应用函数的每一列构建一个 rdd(可能是带有索引的 zip,我也可以在原始数据集中定义它),然后重新加入原始数据框。

这是一个可行的解决方案,还是有办法做得更好?

更新:按照@Andre' Perez 的建议,我可以为每列定义一个 udf 并使用 spark sql 来应用它,或者

import numpy as np
import pyspark.sql.functions as F
f1_udf = F.udf(lambda x: float(np.sin(x)), FloatType())
f2_udf = F.udf(lambda x: float(np.cos(x)), FloatType())
f3_udf = F.udf(lambda x: float(np.tan(x)), FloatType())


df = df.withColumn("col1", f1_udf("col1"))
df = df.withColumn("col2", f2_udf("col2"))
df = df.withColumn("col3", f3_udf("col3"))

标签: apache-sparkpysparkuser-defined-functionsrdd

解决方案


也许最好将这些函数注册为 UDF(即使您说您不想遵循这种方法)。

spark.udf.register("func1", f1)
spark.udf.register("func2", f2)
spark.udf.register("func3", f3)

然后,我会将 DataFrame 注册为临时视图,并使用注册的函数在其上运行 Spark SQL 查询。

df.createOrReplaceTempView("dataframe")
df2 = spark.sql("select func1(col1), func2(col2), func3(col3) from dataframe")

推荐阅读