apache-spark - 如何计算火花结构化流中的移动平均值?
问题描述
我正在尝试根据之前的行而不是基于时间事件来计算火花结构化流中的移动平均值。
Kafka 有这样的字符串消息:device1@227.92@2021-08-19T12:15:13.540Z
有这个代码
Dataset<Row> lines = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "users")
.load()
.selectExpr("CAST(value AS STRING)")
.map((MapFunction<Row, Row>) row -> {
String message = row.getAs("value");
String[] newRow = message.split("@");
return RowFactory.create(newRow);
}, RowEncoder.apply(structType))
.selectExpr("CAST(item AS STRING)", "CAST(value AS DOUBLE)", "CAST(timestamp AS TIMESTAMP)");
上面的代码从 kafka 读取流并将字符串消息转换为行。
当我尝试这样做时:
WindowSpec threeRowWindow = Window.partitionBy("item").orderBy("timestamp").rowsBetween(Window.currentRow(), -3);
Dataset<Row> testWindow =
lines.withColumn("avg", functions.avg("value").over(threeRowWindow));
我收到此错误:org.apache.spark.sql.AnalysisException:流数据帧/数据集不支持非基于时间的窗口;
是否有任何其他方法可以计算移动平均值,因为每条消息都会随着新数据来自流而更新?或者默认情况下不支持任何非基于时间的操作来触发结构化流?
谢谢
解决方案
推荐阅读
- python-3.x - Python Bot 永远运行下去
- azure - 通过 ARM 模板的 Web App 和 Application Insights
- metal - Blit pvrtc 纹理金属
- powershell - 使用批处理powershell命令将文件中的单引号替换为两个单引号
- yocto - 执行 bitbake recipe 丢弃 sstate_cache 是什么
- php - 在 Woocommerce 中的订单详细信息表之前更改付款方式位置
- javascript - 当静态文件的真实引用不包含“静态”时,如何在 Django 中配置设置?
- javascript - 按下圆点时将活动类添加到外部按钮
- node.js - 即使我正在传递参数 carRentalId,也找不到客户的信息,客户的 id 得到 404
- javascript - 单击链接时如何刷新特定的 div 内容以及链接 ID?