首页 > 解决方案 > 用于侧输入的高效 ParDo 设置或 start_bundle

问题描述

列表 A:25M 哈希
列表 B:175K 哈希

我想检查列表 B 中的每个哈希是否存在于列表 A 中。为此,我有一个 ParDo 函数,当它不匹配时我会产生。这是一个重复数据删除过程。

如何有效地设置此 ParDo,现在我在处理列表 B 时对列表 A 进行侧面输入。但是侧面输入不应该转到 ParDo 的 setup() 或 start_bundle(),因此我将查找列表 (A) 存储在工人只有一次?

class Checknewrecords(beam.DoFn):
    def process(self, element, hashlist):
        if element['TA_HASH'] not in hashlist:
            yield element
        else:
            pass

如果您有答案,请附上文档链接,因为我没有找到 Python 版本的任何好的文档。

标签: pythongoogle-cloud-dataflowapache-beam

解决方案


对不起,我最初误解了这个问题。实际上,我认为在 start_bundle 中不可能有侧面输入。它只能在 process_bundle 中访问。但是您可以改为在第一次调用 process bundle 时执行工作并获得类似的结果。

class DoFnMethods(beam.DoFn):
  def __init__(self):
    self.first_element_processed = False
    self.once_retrieved_side_input_data = None

  def called_once(self, side_input):
    if self.first_element_processed:
      return
    self.once_retrieved_side_input_data = side_input.get(...)
    self.first_element_processed = True

  def process(self, element, side_input):
    self.called_once(side_input)
    ...

注意:您确实需要注意这样一个事实,即开始捆绑包和完成捆绑包将在所有窗口中为捆绑包调用一次,并且提供给处理的侧面输入对于每个计算的窗口都是不同的。因此,如果您正在使用 Windows,您可能需要为 self.first_element_processed 和 self.once_retrieved_side_input_data 变量使用字典(由窗口键入),以便您可以为每个窗口调用一次_onc。


推荐阅读