首页 > 解决方案 > 在 DoFn 中下载文件

问题描述

目前尚不清楚在DoFn.

DoFn将下载一个约 20MB 的文件(一个 ML 模型)以应用于我的管道中的元素。根据 Beam 文档,要求包括可串行化和线程兼容性。

一个例子 ( 1 , 2 ) 与我的非常相似DoFn。它演示了从 GCP 存储桶下载(正如我正在使用 DataflowRunner 所做的那样),但我不确定这种方法是否安全。

是否应该将对象下载到内存字节缓冲区而不是下载到磁盘,或者对于这个用例是否有另一种最佳实践?我还没有遇到这种模式的最佳实践方法。

标签: google-cloud-dataflowapache-beam

解决方案


补充这个答案。

如果您的模型数据是静态的,那么您可以使用下面的代码示例将您的模型作为侧面输入传递。

#DoFn to open the model from GCS location
class get_model(beam.DoFn):
    def process(self, element):
        from apache_beam.io.gcp import gcsio
        logging.info('reading model from GCS')
        gcs = gcsio.GcsIO()
        yield gcs.open(element)


#Pipeline to load pickle file from GCS bucket
model_step = (p
              | 'start' >> beam.Create(['gs://somebucket/model'])
              | 'load_model' >> beam.ParDo(get_model())
              | 'unpickle_model' >> beam.Map(lambda bin: dill.load(bin)))

#DoFn to predict the results.
class predict(beam.DoFn):
    def process(self, element, model):
        (features, clients) = element
        result = model.predict_proba(features)[:, 1]
        return [(clients, result)]

#main pipeline to get input and predict results.
_ = (p
     | 'get_input' >> #get input based on source and preprocess it.
     | 'predict_sk_model' >> beam.ParDo(predict(), beam.pvalue.AsSingleton(model_step))
     | 'write' >> #write output based on target.

在流式管道的情况下,如果您想在预定义的时间后再次加载模型,您可以在此处检查“缓慢变化的查找缓存”模式。


推荐阅读