首页 > 解决方案 > 如何在 apache-beam python 中持久化外部获取的状态数据?

问题描述

在我的 apache-beam 工作中,我调用了一个外部源 GCP Storage,这可以被视为通用目的的 http 调用,重要的部分是它是丰富工作的外部调用。

我正在处理的每一条数据,我都会调用这个 API 来获取一些信息来丰富数据。对 API 上的相同数据有大量重复调用。

是否有一种好方法可以缓存或存储结果以供处理的每条数据重用,以限制所需的网络流量。这是处理的巨大瓶颈。

标签: apache-beam

解决方案


您可以考虑将此值作为实例状态保留在 DoFn 上。例如

class MyDoFn(beam.DoFn):
    def __init__(self):
        # This will be called during construction and pickled to the workers.
        self.value1 = some_api_call()

    def setup(self):
        # This will be called once for each DoFn instance (generally
        # once per worker), good for non-pickleable stuff that won't change.
        self.value2 = some_api_call()

    def start_bundle(self):
        # This will be called per-bundle, possibly many times on a worker.
        self.value3 = some_api_call()

    def process(self, element):
        # This is called on each element.
        key = ...
        if key not in self.some_lru_cache:
            self.some_lru_cache[key] = some_api_call()
        value4 = self.some_lru_cache[key]
        # Use self.value1, self.value2, self.value3 and/or value4 here.

推荐阅读