首页 > 解决方案 > 在 GCS 存储桶和 Dataflow VM 之间读取和写入文件

问题描述

我正在尝试从 GCS 存储桶中读取文件(路径:gs://bucket_name),并将其加载到 Dataflow VM 文件夹(路径/tmp/文件名)。

我还需要将另一个文件从 Dataflow VM 文件夹复制回 GCS 存储桶。

我已经尝试过 apache_beam.io.gcp.gcsio 库,但它似乎不起作用。

任何人都可以对此提出任何建议吗?

标签: google-cloud-dataflowapache-beam

解决方案


最好的方法是使用调用GCS Python APIDoFn的方法触发自定义。DoFn 可以通过将元素发送到. 它可以由 Impulse(仅执行一次)或 PCollection(PCollection 中的每个元素执行)触发。在此处查看下载/上传 blob 和此处查看 GCS Python 客户端库文档。processDoFn

import apache_beam as beam
from google.cloud import storage

p = beam.Pipeline(...)
impulse = p | beam.Impulse()

class ReadWriteToGcs(beam.DoFn):
  def setup(self, e):
    self.client = storage.Client()

  def process(self, e):
    bucket = self.client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)
    blob.upload_from_filename(source_file_name)

推荐阅读