首页 > 解决方案 > 谷歌protobuf和python中的多线程

问题描述

如何在多进程脚本中使用 Google 协议缓冲区?

我的用例是:

AdWords 广告系列包装对象

我有一个用于旧 AdWords API 的现有流程,我在其中提取数据并将其存储在自定义类中,例如

class Campaign(Represantable):
    def __init__(self, id, managed_customer_id, base_campaign_id, name, status, serving_status):
        self.id = id
        self.managed_customer_id = managed_customer_id
        self.base_campaign_id = base_campaign_id
        self.name = name
        self.status = status
        self.serving_status = serving_status

    @classmethod
    def from_zeep(cls, campaign, managed_customer_id):
        return cls(
            campaign.id,
            managed_customer_id,
            campaign.baseCampaignId,
            campaign.name,
            campaign.status,
            campaign.servingStatus
        )

多处理脚本

如果我想从十几个帐户中提取广告系列,我可以运行Campaign并行填充对象的脚本pathos(再次为本示例简化的代码):

import multiprocessing as mp
from pathos.pools import ProcessPool

class WithParallelism(object):
    def __init__(self, parallelism_level):
        self.parallelism_level = parallelism_level

    def _parallel_apply(self, fn, collection, **kwargs):
        pool = ProcessPool(
            nodes=self.parallelism_level
        )
        
        # this is to prevent Python from printing large traces when user interrupts execution (e.g. Ctrl+C)
        def keyboard_interrupt_wrapper_fn(*args_wrapped):
            try:
                return fn(*args_wrapped, **kwargs)
            except KeyboardInterrupt:
                pass
            except Exception as err:
                return err

        errors = pool.map(keyboard_interrupt_wrapper_fn, collection)

        return error

Google Ads 广告系列包装对象

使用API,我计划将 protobuf 对象存储在我的类中,并使用指针来访问对象属性。我的类比示例复杂得多,对属性使用描述符和子类 init,但为简单起见,它实际上是这样的:

class Campaign(Proto):
    def __init__(self, **kwargs):
        if "proto" in kwargs:
            self._proto = kwargs['proto']
        if "parent" in kwargs:
            self._parent = kwargs['parent']
        self._init_metadata(**kwargs)


    @property
    def id(self):
        return self._proto.id.value

    @property
    def name(self):
        return self._proto.name.value

   ...

这具有能够遍历parentGoogle Ads 对象以从该对象中提取数据的额外优势protobuf

但是,当我运行脚本以并行填充这些新对象时,出现pickle错误。我知道multiprocess用于pickle序列化对象,对象的主要优点之一protobuf是它们可以轻松序列化。

我应该如何并行提取新的 Google Ads 数据:

标签: pythonmultithreadingprotocol-buffersgoogle-ads-api

解决方案


推荐阅读