pyspark - 如何使用具有火花数据流结构的非基于时间的窗口?
问题描述
我正在尝试使用带有 spark 和 kafka 的结构化流的窗口。我在非基于时间的数据上使用窗口,所以我得到这个错误:
'Non-time-based windows are not supported on streaming DataFrames/Datasets;;\nWindow
这是我的代码:
window = Window.partitionBy("input_id").orderBy("similarity")
outputDf = inputDf\
.crossJoin(ticketDf.withColumnRenamed("IDF", "old_IDF")) \
.withColumn("similarity", cosine_similarity_udf(col("IDF"), col("old_IDF"))) \
.withColumn("rank", rank().over(window)) \
.filter(col("rank") < 10)
因此,我正在寻找在非基于时间的数据上使用窗口的提示或参考...
解决方案
over()
Spark Structured Streaming 不支持传统的 SQL 窗口化(它唯一支持的窗口化是基于时间的窗口化)。如果您考虑一下,可能是为了避免混淆。有些人可能错误地认为 Spark Structured Streaming 可以基于列对整个数据进行分区(这是不可能的,因为流是无界的输入数据)。
您可以改为使用groupBy()
.
groupBy()
也是一个状态完整的操作,在append
模式下是不可能实现的,除非我们在列列表中包含一个时间戳列,我们想要对其进行 groupBy 操作。例如:
df_result = df.withWatermark("createdAt", "10 minutes" ) \
.groupBy( F.col('Id'), window(F.col("createdAt"), self.acceptable_time_difference)) \
.agg(F.max(F.col('createdAt')).alias('maxCreatedAt'))
在这个例子createdAt
中是一个时间戳类型的列。请注意,在这种情况下,我们必须withWatermrke
事先调用时间戳列,因为 Spark 无法无限存储状态。
ps:我知道 groupBy 的功能并不完全像窗口化,但是通过简单的连接或自定义函数 with mapGroupsWithState
,您可能能够实现所需的功能。
推荐阅读
- angular - AngularJS:所选选项不起作用
- amazon-web-services - 更改已在运行的 Amazon MQ 代理的子网
- docker - `docker build --network 容器的可能用例是什么:
`? - c# - 在 C# 应用程序设置期间 SQL Server 登录失败
- visual-studio-code - VSCode - 设置同步 - 无法写入文件 EACCES
- rest - 如何在 REST API 的 cloudformation 模板中指定 TLS 版本?
- java - Spring Data MongoDB:MergeOperation 返回整个集合。为什么?
- elasticsearch - 需要嵌套排序弹性搜索查询
- reactjs - 用滑块对图像变化做出反应(不是轮播)
- javascript - 数据关系 - 如何在外键上允许 Null