首页 > 解决方案 > 如何在 Spark 结构化流中使用 UDF(用户定义函数)?

问题描述

我做了一点搜索。这个答案告诉我,我可以在 GroupedData 上使用 UDF,它可以工作,我可以用我自己的函数处理 GroupData 中的那些行和列。

根据官方教程。他们使用 groupBy() 和 window() 操作来表达窗口聚合,如下所示。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

我的问题是,是否有办法在words.groupBy(window(words.timestamp, "10 minutes", "5 minutes")上使用 UDF 。可能是下面的代码?我试过但它不起作用。

schema = StructType(
    [StructField("key", StringType()), StructField("avg_min", DoubleType())]
)

@panda_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    #whatever user-defined code 

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).apply(g)

标签: pandasapache-sparkspark-structured-streaming

解决方案


在 Spark 3 中,您可以使用,applyInPandas而不是显式的@pandas_udf(请参阅文档):

def g(df):
    #whatever user-defined code 

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).applyInPandas(g, schema=schema)

在这种情况下,您将获得 Pandas DataFrame 并返回 Pandas DataFrame。


推荐阅读