首页 > 解决方案 > 如何计算火花结构化流中的移动平均值?

问题描述

我正在尝试根据之前的行而不是基于时间事件来计算火花结构化流中的移动平均值。

Kafka 有这样的字符串消息:device1@227.92@2021-08-19T12:15:13.540Z

有这个代码

Dataset<Row> lines = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "users")
                .load()
                .selectExpr("CAST(value AS STRING)")
                .map((MapFunction<Row, Row>) row -> {
                    String message = row.getAs("value");
                    String[] newRow = message.split("@");
                    return RowFactory.create(newRow);
                    }, RowEncoder.apply(structType))
                .selectExpr("CAST(item AS STRING)", "CAST(value AS DOUBLE)", "CAST(timestamp AS TIMESTAMP)");

上面的代码从 kafka 读取流并将字符串消息转换为行。

当我尝试这样做时:

WindowSpec threeRowWindow = Window.partitionBy("item").orderBy("timestamp").rowsBetween(Window.currentRow(), -3);

Dataset<Row> testWindow = 
lines.withColumn("avg",  functions.avg("value").over(threeRowWindow));

我收到此错误:org.apache.spark.sql.AnalysisException:流数据帧/数据集不支持非基于时间的窗口;

是否有任何其他方法可以计算移动平均值,因为每条消息都会随着新数据来自流而更新?或者默认情况下不支持任何非基于时间的操作来触发结构化流?

谢谢

标签: apache-sparkapache-spark-sqlspark-structured-streaming

解决方案


推荐阅读