java - Apache Beam 独特元素的数量
问题描述
我有一个 Apache Beam 作业,它从 PubSub 注入数据然后加载到 BigQuery,我将 PubSub 消息转换为带有字段的 pojo
身份证、姓名、人数
计数是指非唯一元素在单个摄取中的计数。
如果我从 PubSub 3 个元素加载,其中两个是相同的,那么我需要加载到 BigQuery 2 个元素中,其中一个的计数为 2。
我想知道在 Apache Beam 中实现它是多么容易。我试图通过 DoFn 或 MapElements 制作它,但我只能处理单个元素。我也尝试将元素转换为 KV,然后计数,但我有非确定性编码器。
在通常的 java 应用程序中,我可以简单地使用 equals 或通过 Map,但在 Apache Beam 中,一切都不同。
解决方案
简单而正确的方法是使用Count.<T>perElement()
,如下所示:
Pipeline p = ...;
PCollection<T> elements = p.apply(...); // read elements
PCollection<KV<T, Long>> elementsCounts =
elements.apply(Count.<T>perElement());
PCollection<TableRow> results = elementsCounts.apply(ParDo.of(
new FormatOutputFn()));
虽然,对,你需要有一个确定性的元素编码器。因此,如果不是这种情况(正如我从您上面所说的那样),您需要在将Count
元素转换为不同的表示之前添加一个步骤,以便有可能具有确定性编码器(例如AvroCoder
,例如)。
如果由于某些原因不可能,那么另一种解决方法可能是为每个元素计算一个 uniq 哈希(但哈希值也必须是确定性的),KV
为每个元素创建一个新哈希作为 aKey
和元素作为 aValue
并使用GroupByKey
下游具有相同值的分组元组。
另外,请注意,由于PubSub
是无限源,您需要通过任何类型的Windows
策略(除了Global
一种)“窗口化”您的输入,因为您的所有组/组合操作都应该在一个窗口内完成。看看WindowedWordCount作为类似问题的解决方案示例。
推荐阅读
- gradle - 在 Gradle protobuf 插件中排除特定文件
- excel - 脱离上下文并选择列中的填充值
- php - Laravel Eloquent 加入 IN 子句
- bilby - 从文件设置功率谱密度
- python - 可以从 FTP 存储 getvalue() 并存储到数据帧 Python
- python - 如何检查 random.choice 的打印值是否与“猜测”变量匹配
- c# - Asp.net MVC 表单远程验证未通过异步 api 调用执行
- c# - 源代码中的 .net 核心连接字符串
- python-3.x - Python Selenium Vanguard:NoSuchElementException:没有这样的元素:无法使用 Selenium 和 Python 定位元素
- php - 为什么这次foreach参数无效,而前一次有效