apache-spark - Pyspark 数据聚合与 Window 和索引上的滑动间隔
问题描述
我目前遇到的问题是我想在我的 csv 上使用一个窗口和滑动间隔,并为每个窗口执行数据聚合以获得最常见的类别。但是我没有时间戳,我想在索引列上执行窗口滑动。谁能指出我如何在索引上使用窗口+滑动间隔的正确方向?
简而言之,我想在索引列上创建窗口+间隔。
目前我有这样的事情:
schema = StructType().add("index", "string").add(
"Category", "integer")
dataframe = spark \
.readStream \
.option("sep", ",") \
.schema(schema) \
.csv("./tmp/input")
# TODO perform Window + sliding interval on dataframe, then perform aggregation per window
aggr = dataframe.groupBy("Category").count().orderBy("count", ascending=False).limit(3)
query = aggr \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
解决方案
对于基于每个窗口的聚合数据,您可以使用pyspark.sql.functions 包中的窗口函数。
对于时间间隔,您需要在数据框中添加时间戳列。
newDf = csvFile.withColumn("TimeStamp", current_timestamp())
当从 csv 读取数据时,此代码会在数据框中添加当前时间。
trimmedDf2 = newDf.groupBy(window(col("TimeStamp"), "5 seconds")).agg(sum("value")).select("window.start", "window.end", "sum(value)")
display(trimmedDf2)
上面的代码总结了值列并将它们分组在 5 秒时间戳窗口中。
您也可以使用上面的链接作为参考。
推荐阅读
- html - flex容器中的img占用额外空间?
- python - “AttributeError: 'AnonymousUserMixin' 对象没有属性”删除并重新创建 SQLAlchemy 表后
- python - 如何将熊猫数据框转换为 Json?
- objective-c - 如何设置“其他链接器标志”以在 CMake 中包含 -ObjC?
- api - 如何提取或查找带有相关交易所后缀或前缀的 Trading Economics 平台股票代码列表
- flutter - 字符串列表到 StatefulWidget Flutter 列表
- swift - 领域 Swift 错误:当前不支持包含嵌入对象的循环
- r - 如何在 R Studio 中将多个条形图转换为百分比条形图
- ios - 如何使用插值字符串作为 Firebase Cloud Functions 和 Typescript 的主题名称
- javascript - 如何将一个 div 的内容放在相邻的 div 之上?