首页 > 解决方案 > 检查 pcollection 是否为空的 Java 代码

问题描述

我正在尝试根据 pubsub 消息编写一个管道来插入/更新/删除 mysql 表。在插入特定表时,我必须检查数据是否存在于其他表中,并且仅在数据可用时才进行插入在另一张桌子上

当另一个表(PCollection)中没有数据时,我将不得不停止插入过程。

PCollection recordCount= windowedMatchedCollection.apply(Combine.globally(new CountElements()).withoutDefaults());

这条线似乎没有帮助。请对此有任何意见

标签: apache-beam

解决方案


有点不清楚您到底要做什么,但这应该可以通过计算元素来实现。例如,假设您有

# A PCollection of (table, new_row) KVs.
new_data = ...

# A PCollection of (table, old_row) KVs.
old_data = ...

然后你可以做

rows_per_old_table = old_data | beam.CombinePerKey(
    beam.combiners. CountCombineFn())

并使用它通过侧面输入过滤掉您的数据。

def maybe_filter_row(table_and_row, old_table_count):
  # table_and_row comes from the PCollection new_data
  # old_table_count is the side input as a Map
  table = table_and_row[0]
  if old_table_count.get(table) > 0:
    yield table_and_row

new_data_to_update = new_data | beam.FlatMap(
    maybe_filter_row,
    old_table_count=beam.pvalue.AsMap(rows_per_old_table))

现在,您new_data_to_update将只包含 old_data 中行数非零的表的数据。

如果您尝试以流式方式执行此操作,则必须对所有内容进行窗口化,包括old_data,并且它只会过滤掉在同一窗口中具有数据的那些内容。你可以改为做类似的事情

# Create a PCollection containing the set of tables in new_data, per window.
tables_to_consider = (
    new_data
    | beam.GroupByKey()
    | beam.MapTuple(lambda table, rows: table))

rows_per_old_table = tables_to_consider | beam.ParDo(
    SomeDoFnLookingUpCurrentSizeOfEachTable())

# Continue as before.

推荐阅读