首页 > 解决方案 > 使用 Spark 结构化流式传输仅保留最新数据

问题描述

我正在像这样流式传输数据:time, idvalue 我只想为每个 保留一个记录id,最新的value. 处理这个问题的最佳方法是什么?更喜欢使用 Pyspark

标签: apache-sparkpysparkspark-streamingdatabricks

解决方案


from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
window = Window.partitionBy("id").orderBy("time",'tiebreak')
df_s
 .withColumn('tiebreak', monotonically_increasing_id())
 .withColumn('rank', rank().over(window))
 .filter(col('rank') == 1).drop('rank','tiebreak')
 .show()

添加排名和平局以删除窗口分区之间和窗口分区内的重复或平局。


推荐阅读