首页 > 解决方案 > Pyspark 数据聚合与 Window 和索引上的滑动间隔

问题描述

我目前遇到的问题是我想在我的 csv 上使用一个窗口和滑动间隔,并为每个窗口执行数据聚合以获得最常见的类别。但是我没有时间戳,我想在索引列上执行窗口滑动。谁能指出我如何在索引上使用窗口+滑动间隔的正确方向?

简而言之,我想在索引列上创建窗口+间隔。

目前我有这样的事情:

schema = StructType().add("index", "string").add(
    "Category", "integer")
                                                                             
dataframe = spark \
    .readStream \
    .option("sep", ",") \
    .schema(schema) \
    .csv("./tmp/input")

# TODO perform Window + sliding interval on dataframe, then perform aggregation per window
aggr = dataframe.groupBy("Category").count().orderBy("count", ascending=False).limit(3)

query = aggr \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

标签: apache-sparkpysparkspark-structured-streaming

解决方案


对于基于每个窗口的聚合数据,您可以使用pyspark.sql.functions 包中的窗口函数。

对于时间间隔,您需要在数据框中添加时间戳列。

newDf = csvFile.withColumn("TimeStamp", current_timestamp())

当从 csv 读取数据时,此代码会在数据框中添加当前时间。

trimmedDf2 = newDf.groupBy(window(col("TimeStamp"), "5 seconds")).agg(sum("value")).select("window.start", "window.end", "sum(value)")

display(trimmedDf2)

上面的代码总结了值列并将它们分组在 5 秒时间戳窗口中。

这是代码的输出

在 Spark 中使用 Windows 函数进行每周聚合

您也可以使用上面的链接作为参考。


推荐阅读