首页 > 解决方案 > 如何使用具有火花数据流结构的非基于时间的窗口?

问题描述

我正在尝试使用带有 spark 和 kafka 的结构化流的窗口。我在非基于时间的数据上使用窗口,所以我得到这个错误:

'Non-time-based windows are not supported on streaming DataFrames/Datasets;;\nWindow

这是我的代码:

window = Window.partitionBy("input_id").orderBy("similarity")
outputDf = inputDf\
        .crossJoin(ticketDf.withColumnRenamed("IDF", "old_IDF")) \
        .withColumn("similarity", cosine_similarity_udf(col("IDF"), col("old_IDF"))) \
        .withColumn("rank", rank().over(window)) \
        .filter(col("rank") < 10)

因此,我正在寻找在非基于时间的数据上使用窗口的提示或参考...

标签: pysparkapache-spark-sqlspark-streaming

解决方案


over()Spark Structured Streaming 不支持传统的 SQL 窗口化(它唯一支持的窗口化是基于时间的窗口化)。如果您考虑一下,可能是为了避免混淆。有些人可能错误地认为 Spark Structured Streaming 可以基于列对整个数据进行分区(这是不可能的,因为流是无界的输入数据)。

您可以改为使用groupBy(). groupBy()也是一个状态完整的操作,在append模式下是不可能实现的,除非我们在列列表中包含一个时间戳列,我们想要对其进行 groupBy 操作。例如:

df_result = df.withWatermark("createdAt", "10 minutes" ) \
              .groupBy( F.col('Id'), window(F.col("createdAt"), self.acceptable_time_difference)) \
              .agg(F.max(F.col('createdAt')).alias('maxCreatedAt'))

在这个例子createdAt中是一个时间戳类型的列。请注意,在这种情况下,我们必须withWatermrke事先调用时间戳列,因为 Spark 无法无限存储状态。

ps:我知道 groupBy 的功能并不完全像窗口化,但是通过简单的连接或自定义函数 with mapGroupsWithState,您可能能够实现所需的功能。


推荐阅读