首页 > 解决方案 > Spark结构化流中的滞后函数

问题描述

我正在使用 Spark 2.3 结构化流并尝试使用“滞后”功能。但是,结构化流媒体似乎不支持延迟。

val output = spark.sql("SELECT temperature, time, lag(temperature, 1) OVER (ORDER BY time) AS PrevTemp FROM InputTable")

得到这个错误:

org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets; line 1 pos 0;

是否有另一种方法可以通过结构化流实现这种“滞后”功能?

谢谢!

标签: apache-sparkspark-structured-streaming

解决方案


据我所知,没有。

可能,您可以使用 mapGroupsWithState。例如:

    case class PayLoad(event_time: java.sql.Timestamp, data: String)

def mappingFunction(key: java.sql.Timestamp, values: Iterator[PayLoad], state: GroupState[PayLoad]): PayLoad = {
  ??? // Work with values iterator
}

val temperature: DataFrame = ???
temperature
  .withColumn("event_time", org.apache.spark.sql.functions.current_timestamp())
  .as[PayLoad]
  .groupByKey(_.event_time)
  .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(mappingFunction)

您不需要保持状态,但通过这种方式您可以访问值迭代器并且您能够解决任何任务。

请记住,在这种情况下,所有微批处理数据都将转到一个分区,并且具有巨大的有效负载可能会导致巨大的延迟甚至 OOM。(以及与OVER (ORDER BY time)

希望能帮助到你。


推荐阅读