python - 用于侧输入的高效 ParDo 设置或 start_bundle
问题描述
列表 A:25M 哈希
列表 B:175K 哈希
我想检查列表 B 中的每个哈希是否存在于列表 A 中。为此,我有一个 ParDo 函数,当它不匹配时我会产生。这是一个重复数据删除过程。
如何有效地设置此 ParDo,现在我在处理列表 B 时对列表 A 进行侧面输入。但是侧面输入不应该转到 ParDo 的 setup() 或 start_bundle(),因此我将查找列表 (A) 存储在工人只有一次?
class Checknewrecords(beam.DoFn):
def process(self, element, hashlist):
if element['TA_HASH'] not in hashlist:
yield element
else:
pass
如果您有答案,请附上文档链接,因为我没有找到 Python 版本的任何好的文档。
- transform_records 是来自先前转换的 PCollection
current_data 是来自 BigQuery.read 的 PCollection
新记录 = 转换记录 | 'Checknewrecords' >> beam.ParDo(Checknewrecords(), pvalue.AsList(current_data))
解决方案
对不起,我最初误解了这个问题。实际上,我认为在 start_bundle 中不可能有侧面输入。它只能在 process_bundle 中访问。但是您可以改为在第一次调用 process bundle 时执行工作并获得类似的结果。
class DoFnMethods(beam.DoFn):
def __init__(self):
self.first_element_processed = False
self.once_retrieved_side_input_data = None
def called_once(self, side_input):
if self.first_element_processed:
return
self.once_retrieved_side_input_data = side_input.get(...)
self.first_element_processed = True
def process(self, element, side_input):
self.called_once(side_input)
...
注意:您确实需要注意这样一个事实,即开始捆绑包和完成捆绑包将在所有窗口中为捆绑包调用一次,并且提供给处理的侧面输入对于每个计算的窗口都是不同的。因此,如果您正在使用 Windows,您可能需要为 self.first_element_processed 和 self.once_retrieved_side_input_data 变量使用字典(由窗口键入),以便您可以为每个窗口调用一次_onc。
推荐阅读
- django - 无法在 SubjectRoomSchedule 或 SubjectRoomSchedule 上查找“Time_From”
- python - 如何跳过无法解码的字符?
- php - Codeigniter - 发送包含异常详细信息的电子邮件
- jupyter-notebook - 使用 python 的简单分析仪表板
- python - 插入 Postgres 时 python 代码慢
- javascript - 如何在没有某种样式的情况下将文本粘贴到tinymce
- security - 使用 SecAsn1Decode 解析 DER 格式数据
- php - 使用 PHP 的 I 语句和数组
- javascript - React Hooks & UseEffect 不使用 socketIO 数据更新显示
- json - Serilog 到 MongoDB Atlas