apache-spark - Spark结构化流中的滞后函数
问题描述
我正在使用 Spark 2.3 结构化流并尝试使用“滞后”功能。但是,结构化流媒体似乎不支持延迟。
val output = spark.sql("SELECT temperature, time, lag(temperature, 1) OVER (ORDER BY time) AS PrevTemp FROM InputTable")
得到这个错误:
org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;
是否有另一种方法可以通过结构化流实现这种“滞后”功能?
谢谢!
解决方案
据我所知,没有。
可能,您可以使用 mapGroupsWithState。例如:
case class PayLoad(event_time: java.sql.Timestamp, data: String)
def mappingFunction(key: java.sql.Timestamp, values: Iterator[PayLoad], state: GroupState[PayLoad]): PayLoad = {
??? // Work with values iterator
}
val temperature: DataFrame = ???
temperature
.withColumn("event_time", org.apache.spark.sql.functions.current_timestamp())
.as[PayLoad]
.groupByKey(_.event_time)
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mappingFunction)
您不需要保持状态,但通过这种方式您可以访问值迭代器并且您能够解决任何任务。
请记住,在这种情况下,所有微批处理数据都将转到一个分区,并且具有巨大的有效负载可能会导致巨大的延迟甚至 OOM。(以及与OVER (ORDER BY time)
)
希望能帮助到你。
推荐阅读
- sql - 设置事务表属性会导致外部表
- asp.net-core - 是否可以将 ffmpeg 命令的输出流式传输到具有 dot net core 的客户端?
- javascript - 如何在不影响工具提示标签的情况下更改图例标签?
- python - 深度优先搜索 - 从图的一个顶点到另一个顶点的返回路径包括不必要的顶点
- java - 使用多线程时我们是否应该始终使用 ConcurrentHashMap?
- javascript - JS以完美间隔运行函数
- java - 我正在尝试让 JButton Age 计算,但我有错误
- flask - 如何在所有页面(模板)中添加一个 html 块?
- sql - 删除日期两天内发生的行
- r - 在 R 中,如何按 ID 分组以使用现有值填充缺失值?