首页 > 解决方案 > 将 PySpark 命令转换为自定义函数

问题描述

我想知道是否可以将一系列 PySpark 命令打包成一个函数,以便这样的函数接受一个数据帧并将它们应用于数据帧。我们用 Python 做的事情。

例如,我有以下数据框:

sevents_df.show(5)

+-------+--------+-------------+----------------+------------+-----+
|Counter|Duration|StartTime    |TypeEnumeration |Floor_Number|Value|
+-------+--------+-------------+----------------+------------+-----+
|    1.0|    5460|1503067077370|UC_001          |         NaN|  NaN|
|    1.0|     322|1503067090480|UC_008          |         NaN|  NaN|
|    1.0|     990|1503067099300|UC_001          |         NaN|  NaN|
|    1.0|    5040|1503067396060|UC_001          |         NaN|  NaN|
|    1.0|    6090|1503067402150|UC_001          |         NaN|  NaN|
+-------+--------+-------------+----------------+------------+-----+

步骤 1. 我做的第一件事是过滤掉类型。我只是保持UC_001

sevents_filter = sevents_df.filter(sevents_df['TypeEnumeration'].isin(['UC_001']) == True)

步骤 2. 删除一些列:

columns_to_drop = ['Comments', 'Floor_Number', 'Value']
sevents_clean = sevents_filter.drop(*columns_to_drop)

步骤 3. 转换StartTime为日期

def convert_to_seconds(x):
    return x/1000

udf_myFunction = udf(convert_to_seconds, IntegerType())
sevents2 = sevents2.withColumn("StartTime", udf_myFunction("StartTime"))
sevents4 = sevents2.withColumn('epoch',
                               f.date_format(sevents2.StartTime.cast(dataType=t.TimestampType()),"yyyy-MM-dd"))

我想把这三个步骤放在一个函数中,比如:

some udf pySpark_function(dataframe):
    step 1
    step 2
    step 3

我想这样做的原因是因为如果我有N数据框,我无法想象编写这些步骤N的时间。

一种解决方案是将这些N帧连接成一帧,并通过这些步骤将这一巨大帧传递一次。有没有其他方法可以一次通过一帧?

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


AnUDF用于处理数据框列中的值,不能用于处理整个数据框。相反,创建一个接受数据帧并返回已处理数据帧的普通方法。

def process_df(df):
    df = df.filter(df['TypeEnumeration'] == 'UC_001')

    columns_to_drop = ['Comments', 'Floor_Number', 'Value']
    df = df.drop(*columns_to_drop)

    df = df.withColumn('epoch', f.date_format((df.StartTime / 1000).cast(t.TimestampType()), "yyyy-MM-dd"))

    return df

然后简单地遍历所有数据帧并使用上述方法。

注意:我对代码做了一些简化。没有必要,isin因为您只使用单个值进行过滤,并且不需要UDF除以 1000。如果可能,最好使用内置的 Spark 函数而不是自定义 a UDF,它会更快。


推荐阅读