首页 > 解决方案 > 获取 PCollection 中的所有元素,而不考虑标记

问题描述

我有一个 BigQuery TableRow 元素的 PCollection,这些元素的标记取决于 TableRow 的一列是否已成功解析。

final TupleTag<TableRow> OK = new TupleTag<TableRow>(){};
final TupleTag<TableRow> NOTOK = new TupleTag<TableRow>(){};

我的 ParDo 函数根据列解析标记这些 TableRow,并返回一个名为 myPCollection 的 PCollectionTuple。

我想做以下事情:

  1. 获取 PCollection 中的所有元素(标记为 OK 和 NOTOK),并将它们输出到 BigQuery。
  2. 仅获取标记为 NOTOK 的元素并将它们发送到 Pub/Sub

我知道我可以通过调用 #2

myPCollection.get(NOTOK)

我找不到做#1的方法。我看到有一个名为 myPCollection.getAll() 的方法,但它返回的不是 PCollection,而是 Map,PCollection>

关于如何获取整个元素集的任何想法,无论它们是如何标记的?

标签: google-cloud-dataflowapache-beam

解决方案


您可以使用Flatten变换(Beam guide)将不同的 PCollection 合并为一个:

PCollection<String> okResults = myPCollection.get(OK);
PCollection<String> notOkResults = myPCollection.get(NOTOK);

PCollectionList<String> pcl = PCollectionList.empty(p);
pcl = pcl.and(okResults).and(notOkResults);
PCollection<String> allResults = pcl.apply(Flatten.pCollections());

在这种情况下allResults,将同时包含 OK元素NOTOK。我做了一个例子(完整的代码在这里)有两个输入行,它们被分类为好的或坏的输出:

Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
INFO: All elements: bad line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
INFO: All elements: good line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$3 processElement
INFO: Ok element: good line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$4 processElement
INFO: Not Ok element: bad line

使用 2.17.0 SDK 和DirectRunner.


推荐阅读