apache-spark - 使用 Spark 结构化流式传输仅保留最新数据
问题描述
我正在像这样流式传输数据:time
, id
,value
我只想为每个 保留一个记录id
,最新的value
. 处理这个问题的最佳方法是什么?更喜欢使用 Pyspark
解决方案
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
window = Window.partitionBy("id").orderBy("time",'tiebreak')
df_s
.withColumn('tiebreak', monotonically_increasing_id())
.withColumn('rank', rank().over(window))
.filter(col('rank') == 1).drop('rank','tiebreak')
.show()
添加排名和平局以删除窗口分区之间和窗口分区内的重复或平局。
推荐阅读
- php - 使用PHP的fputcsv时如何写引号(“”)
- python - 试图做一个检查回文函数
- python - 是否可以使用没有插值的样条线(或任何其他 Python 库)使 matplotlib-plot 连接稀疏点?
- json - StatusCode:404,ReasonPhrase:'未找到',版本:1.1,内容:System.Net.Http.HttpConnectionResponseContent,
- aem - 体验片段的实时副本 - AEM 6.5
- sql-server - sqlservr.exe 总是低优先级
- powershell - 将大型 PBI 报告导入组工作区时出现 New-PowerBIReport 错误:“任务已取消。”
- docker - 错误无法将图像'library/web:latest'推送到注册表'docker.io'。错误:被拒绝:请求的资源访问被拒绝 - Kompose up
- angularjs - Linux 的角度错误
- java - Spring Boot + Java:来自 JSON 数据的基于关键字的搜索