首页 > 解决方案 > Apache Beam 独特元素的数量

问题描述

我有一个 Apache Beam 作业,它从 PubSub 注入数据然后加载到 BigQuery,我将 PubSub 消息转换为带有字段的 pojo

身份证、姓名、人数

计数是指非唯一元素在单个摄取中的计数。

如果我从 PubSub 3 个元素加载,其中两个是相同的,那么我需要加载到 BigQuery 2 个元素中,其中一个的计数为 2。

我想知道在 Apache Beam 中实现它是多么容易。我试图通过 DoFn 或 MapElements 制作它,但我只能处理单个元素。我也尝试将元素转换为 KV,然后计数,但我有非确定性编码器。

在通常的 java 应用程序中,我可以简单地使用 equals 或通过 Map,但在 Apache Beam 中,一切都不同。

标签: javaetlapache-beam

解决方案


简单而正确的方法是使用Count.<T>perElement(),如下所示:

Pipeline p = ...;
PCollection<T> elements = p.apply(...); // read elements
PCollection<KV<T, Long>> elementsCounts =
    elements.apply(Count.<T>perElement());
PCollection<TableRow> results = elementsCounts.apply(ParDo.of(
    new FormatOutputFn()));

虽然,对,你需要有一个确定性的元素编码器。因此,如果不是这种情况(正如我从您上面所说的那样),您需要在将Count元素转换为不同的表示之前添加一个步骤,以便有可能具有确定性编码器(例如AvroCoder,例如)。

如果由于某些原因不可能,那么另一种解决方法可能是为每个元素计算一个 uniq 哈希(但哈希值也必须是确定性的),KV为每个元素创建一个新哈希作为 aKey和元素作为 aValue并使用GroupByKey下游具有相同值的分组元组。

另外,请注意,由于PubSub是无限源,您需要通过任何类型的Windows策略(除了Global一种)“窗口化”您的输入,因为您的所有组/组合操作都应该在一个窗口内完成。看看WindowedWordCount作为类似问题的解决方案示例。


推荐阅读