首页 > 解决方案 > 如何使用 Java 在 Apache Flink 中对 DataStream 执行平均操作

问题描述

我正在尝试计算 Flink 中输入数据流(无窗口)的平均值

我使用映射器将流从 (key, value) 更改为 (key, value, 1)

现在我需要对第 2 和第 3 字段求和并将它们除以彼此。

输入数据流来自 'KEY VALUE' 形式的套接字连接,如 'X 5'

public class AvgViews {

DataStream<Tuple2<String, Double>> AvgViewStream = dataStream
                .map(new AvgViews.RowSplitter())
                .keyBy(0)
                //.??? 



    public static class RowSplitter implements
            MapFunction<String, Tuple3<String, Double, Integer>> {

        public Tuple3<String, Double, Integer> map(String row)
                throws Exception {
            String[] fields = row.split(" ");
            if (fields.length == 2) {
                return new Tuple3<String, Double, Integer>(
                        fields[0],
                        Double.parseDouble(fields[1]),
                        1);
            }
            return null;
        }
    }
}

标签: javaapache-flink

解决方案


您可以使用将 Tuple2 保持在键控状态的 RichMap(或 RichFlatMap)。您需要将每个传入记录添加到状态,并将平均值作为输出。

文档中的CountWindowAverage 示例做了类似的事情,虽然有点复杂。


推荐阅读