首页 > 解决方案 > 对结构化流数据帧执行统计查询

问题描述

我目前有一个结构化流数据帧,它将时间戳计数聚合到每小时类别(每小时窗口)中。虽然很高兴看到计数,但我的目标是找到有关正在运行的查询的一些信息,例如最大计数的小时、最小计数、每小时平均时间戳数、24 小时内的总计数等。 .

我的问题和疑问是如何访问和查询活动的、正在运行的 Streaming DataFrame 以提供此类信息。更进一步,我想为最大和最小条目提取整行(例如,产生最高/最低时间戳计数的小时以及计数的时间戳数量)

理想情况下,我想在 Streaming DataFrame 中显示每小时增加的时间戳计数,并在每批之后提供最大值、最小值等。但是,如果我不能同时完成两者,我完全可以接受最大值、最小值等。

作为参考,这是我正在使用的示例代码以及一些伪代码来显示我想要做什么。

val spark = SparkSession.builder.appName("Sample").getOrCreate
import spark.implicits._
val stream = spark.option("maxFilesPerTrigger", 1).text("file:///path/location")
val extracted = stream.select(extract_udf($value).cast(TimestampType) as 
  "timestamp").groupBy(window($"timestamp", "1 hour"), hour($"timestamp") as
  "hour").count.sort($"window")
val query = extracted.writeStream.queryName("time_table").outputMode("complete").
  format("console").start
query.awaitTermination

所需的伪代码

val max = findMaxRow(query) // Would extract from the row for a tuple (hour, count)
val min = // same as max
val runningCount = findTotalCount(query) // Would return the current count
val stats = List(max, min, count, etc)
val statsDF = stats.toDF
// Code to display stats along with the queried DataFrame
statsDF.writeStream.... ???

同样在任何人提到它之前,我已经尽可能多地搜索并且找不到答案。这篇文章是我能找到的最接近的答案,但没有解决如何查询活动流数据帧以及如何同时拥有流数据帧和统计数据帧的问题。

任何帮助表示赞赏,无论这是不可能的还是我需要采取不同的方法。

谢谢!

标签: scalaapache-sparkapache-spark-sqlspark-structured-streaming

解决方案


推荐阅读