java - 如何使用 Apache Flink 在字数统计中避免重复的键元组
问题描述
我正在尝试使用 Apache Flink 编写一个简单的字数统计程序,因为我正在学习它。
问题是我无法摆脱结果中的重复键元组。
输入:
a
aaa
ab
aaa
a
a
输出:
(a,1)
(a,2)
(a,3)
(aaa,1)
(aaa,2)
(ab, 1)
预期输出:
(a,3)
(aaa,2)
(ab, 1)
我的代码:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("data.in");
DataStream<Tuple2<String, Integer>> counts = text
.map(s -> Tuple2.of(s, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.sum(1);
counts.print();
env.execute();
}
解决方案
Flink 的流式 API 并非旨在产生您期望的结果。相反,流处理背后的想法是输入可能是无界的——换句话说,输入将永远持续地到达。实际上,是的,输入可能会终止,但话又说回来,也许不会。
由于 Flink 不期望流输入将永远终止,因此不能指望等到结束才产生结果。相反,Flink 的 DataStream API 是围绕连续输入产生连续结果的思想组织的。每个新的输入事件都可能产生更新的结果。
然而,有一种方法可以完成您想要的,同时仍然使用 DataStream API,但它有点复杂。
事实证明,当您将 Flink 与有界输入源(如文件)一起使用时,当它到达该有界输入的末尾时,会通过作业图发送一个信号,指示已到达末尾。事实上,您可以等待这个信号,然后才能产生结果。
我说的这个信号实际上是一个水印,其值为MAX_WATERMARK。因此,您可以做的是让 ProcessFunction 为遥远的将来的某个时间点设置一个事件时间计时器。此计时器仅在出现此特殊水印时才会触发。同时,这个 ProcessFunction 应该监视流,跟踪最新的结果(对于每个键)——只有当这个计时器在收到这个非常大的水印时最终触发时,它才会收集到输出。
或者你可以只使用 Flink 的 DataSet API,它是围绕批处理进行组织的。然后你会得到你所期望的。
推荐阅读
- python - 数据库中的外键
- statistics - 用于 T 检验的对照组/测试组的最佳样本量
- c++ - 一个类是否可以拥有将当前类作为参数的成员
- python - 使用win32 python将excel中的一列数据格式化为百分比
- python - Python + Selenium 从滚动框中选择 href
- excel - Excel - 分配每个值的索引是一个排序数组
- javascript - 在 CSS 更改上更改 JS
- laravel - 如果用户在 laravel 中分配了多个角色,我如何使用单个角色登录?
- windows - 如何在 Windows 7 上为帮助台用户授予 Active Directory 中的只读访问权限?
- amazon-web-services - cognito 中的每个身份角色