google-cloud-dataflow - 获取 PCollection 中的所有元素,而不考虑标记
问题描述
我有一个 BigQuery TableRow 元素的 PCollection,这些元素的标记取决于 TableRow 的一列是否已成功解析。
final TupleTag<TableRow> OK = new TupleTag<TableRow>(){};
final TupleTag<TableRow> NOTOK = new TupleTag<TableRow>(){};
我的 ParDo 函数根据列解析标记这些 TableRow,并返回一个名为 myPCollection 的 PCollectionTuple。
我想做以下事情:
- 获取 PCollection 中的所有元素(标记为 OK 和 NOTOK),并将它们输出到 BigQuery。
- 仅获取标记为 NOTOK 的元素并将它们发送到 Pub/Sub
我知道我可以通过调用 #2
myPCollection.get(NOTOK)
我找不到做#1的方法。我看到有一个名为 myPCollection.getAll() 的方法,但它返回的不是 PCollection,而是 Map,PCollection>
关于如何获取整个元素集的任何想法,无论它们是如何标记的?
解决方案
您可以使用Flatten
变换(Beam guide)将不同的 PCollection 合并为一个:
PCollection<String> okResults = myPCollection.get(OK);
PCollection<String> notOkResults = myPCollection.get(NOTOK);
PCollectionList<String> pcl = PCollectionList.empty(p);
pcl = pcl.and(okResults).and(notOkResults);
PCollection<String> allResults = pcl.apply(Flatten.pCollections());
在这种情况下allResults
,将同时包含和 OK
元素NOTOK
。我做了一个例子(完整的代码在这里)有两个输入行,它们被分类为好的或坏的输出:
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
INFO: All elements: bad line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
INFO: All elements: good line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$3 processElement
INFO: Ok element: good line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$4 processElement
INFO: Not Ok element: bad line
使用 2.17.0 SDK 和DirectRunner
.
推荐阅读
- docker - 我应该如何使用主机而不是服务名称连接到容器?
- visual-studio-code - 如何在 VSCode 中更改智能感知框的字体大小?
- python - 将数据框转换为 json 文件
- php - 如何使用逗号分隔的 id 修复多个下拉列表?
- python - 在不使用熊猫的情况下,我需要从列表输出中删除某些字符
- javascript - 如何使用电子-dl
- prolog - 列表中 findall/3 的数学运算问题(Prolog)
- python - 字符串的 endswith() 函数给出了矛盾的结果
- java - JavaFX 应用程序在找到 ServerSocket.accept() 方法时没有响应
- powershell - 如何在运行时通过输入/输出保持 PowerShell 进程打开?