scala - 对结构化流数据帧执行统计查询
问题描述
我目前有一个结构化流数据帧,它将时间戳计数聚合到每小时类别(每小时窗口)中。虽然很高兴看到计数,但我的目标是找到有关正在运行的查询的一些信息,例如最大计数的小时、最小计数、每小时平均时间戳数、24 小时内的总计数等。 .
我的问题和疑问是如何访问和查询活动的、正在运行的 Streaming DataFrame 以提供此类信息。更进一步,我想为最大和最小条目提取整行(例如,产生最高/最低时间戳计数的小时以及计数的时间戳数量)
理想情况下,我想在 Streaming DataFrame 中显示每小时增加的时间戳计数,并在每批之后提供最大值、最小值等。但是,如果我不能同时完成两者,我完全可以接受最大值、最小值等。
作为参考,这是我正在使用的示例代码以及一些伪代码来显示我想要做什么。
val spark = SparkSession.builder.appName("Sample").getOrCreate
import spark.implicits._
val stream = spark.option("maxFilesPerTrigger", 1).text("file:///path/location")
val extracted = stream.select(extract_udf($value).cast(TimestampType) as
"timestamp").groupBy(window($"timestamp", "1 hour"), hour($"timestamp") as
"hour").count.sort($"window")
val query = extracted.writeStream.queryName("time_table").outputMode("complete").
format("console").start
query.awaitTermination
所需的伪代码
val max = findMaxRow(query) // Would extract from the row for a tuple (hour, count)
val min = // same as max
val runningCount = findTotalCount(query) // Would return the current count
val stats = List(max, min, count, etc)
val statsDF = stats.toDF
// Code to display stats along with the queried DataFrame
statsDF.writeStream.... ???
同样在任何人提到它之前,我已经尽可能多地搜索并且找不到答案。这篇文章是我能找到的最接近的答案,但没有解决如何查询活动流数据帧以及如何同时拥有流数据帧和统计数据帧的问题。
任何帮助表示赞赏,无论这是不可能的还是我需要采取不同的方法。
谢谢!
解决方案
推荐阅读
- django - 在 django ModelViewSet 中写一个更薄的视图
- java - 将父版本外部化到 POM 中的变量
- python - 如何仅在python中从一天中的特定时间选择数据?
- javascript - PhpStorm 如何命中 AJAX 断点
- python - TEnsorflow 2.1.0 TypeError:__new__()得到了一个意外的关键字参数'serialized_options'
- python - 将元素插入字典的有效方法
- c# - 为什么打印弹出窗口在打印助手中自动打开和关闭?
- python - 为什么在 Python 的块外可以访问 try-except 块内声明的变量?
- javascript - 如何从 JavaScript 中的运行计时器中减去 10 秒?
- python - 除了 OSError as (errcode , message): SyntaxError: invalid syntax