首页 > 解决方案 > 如何使用 Apache Flink 在字数统计中避免重复的键元组

问题描述

我正在尝试使用 Apache Flink 编写一个简单的字数统计程序,因为我正在学习它。

问题是我无法摆脱结果中的重复键元组。

输入:

a
aaa
ab
aaa
a
a

输出:

(a,1)
(a,2)
(a,3)
(aaa,1)
(aaa,2)
(ab, 1)

预期输出:

(a,3)
(aaa,2)
(ab, 1)

我的代码:

public static void main(String[] args) throws Exception {
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  DataStream<String> text = env.readTextFile("data.in");

  DataStream<Tuple2<String, Integer>> counts = text
    .map(s -> Tuple2.of(s, 1))
    .returns(Types.TUPLE(Types.STRING, Types.INT))
    .keyBy(0)
    .sum(1);

  counts.print();
  env.execute();
}

标签: javaapache-flinkword-count

解决方案


Flink 的流式 API 并非旨在产生您期望的结果。相反,流处理背后的想法是输入可能是无界的——换句话说,输入将永远持续地到达。实际上,是的,输入可能会终止,但话又说回来,也许不会。

由于 Flink 不期望流输入将永远终止,因此不能指望等到结束才产生结果。相反,Flink 的 DataStream API 是围绕连续输入产生连续结果的思想组织的。每个新的输入事件都可能产生更新的结果。

然而,有一种方法可以完成您想要的,同时仍然使用 DataStream API,但它有点复杂。

事实证明,当您将 Flink 与有界输入源(如文件)一起使用时,当它到达该有界输入的末尾时,会通过作业图发送一个信号,指示已到达末尾。事实上,您可以等待这个信号,然后才能产生结果。

我说的这个信号实际上是一个水印,其值为MAX_WATERMARK。因此,您可以做的是让 ProcessFunction 为遥远的将来的某个时间点设置一个事件时间计时器。此计时器仅在出现此特殊水印时才会触发。同时,这个 ProcessFunction 应该监视流,跟踪最新的结果(对于每个键)——只有当这个计时器在收到这个非常大的水印​​时最终触发时,它才会收集到输出。

或者你可以只使用 Flink 的 DataSet API,它是围绕批处理进行组织的。然后你会得到你所期望的。


推荐阅读