java - 如何使用 Java 在 Apache Flink 中对 DataStream 执行平均操作
问题描述
我正在尝试计算 Flink 中输入数据流(无窗口)的平均值
我使用映射器将流从 (key, value) 更改为 (key, value, 1)
现在我需要对第 2 和第 3 字段求和并将它们除以彼此。
输入数据流来自 'KEY VALUE' 形式的套接字连接,如 'X 5'
public class AvgViews {
DataStream<Tuple2<String, Double>> AvgViewStream = dataStream
.map(new AvgViews.RowSplitter())
.keyBy(0)
//.???
public static class RowSplitter implements
MapFunction<String, Tuple3<String, Double, Integer>> {
public Tuple3<String, Double, Integer> map(String row)
throws Exception {
String[] fields = row.split(" ");
if (fields.length == 2) {
return new Tuple3<String, Double, Integer>(
fields[0],
Double.parseDouble(fields[1]),
1);
}
return null;
}
}
}
解决方案
您可以使用将 Tuple2 保持在键控状态的 RichMap(或 RichFlatMap)。您需要将每个传入记录添加到状态,并将平均值作为输出。
文档中的CountWindowAverage 示例做了类似的事情,虽然有点复杂。
推荐阅读
- ansible - 获取 Ansible 本地主机 Ipv4.address
- php - 查找数组中的第一个重复项
- python - List vs Dict 并使用 Zip:list(zip) 工作正常,但 dict(zip) 有点偏离
- java - 从数据库列填充列表视图而不重复值
- java - Python 与 Java,For 循环
- html - 如何让我插入的视频显示在我的页面上?
- c - 在 dmesg 中生成陷阱/段错误消息
- javascript - 使用 fileSaver.js 的 saveAs 方法(Javascript)损坏了 POST Rest 的响应(xls 文件)
- typescript - 离子功能在前几次有效,后来停止工作表明即使没有任何改变,它也不是一个功能
- swift - 在 Swift Playground 中使用 atan2() - Xcode 9