google-cloud-dataflow - 在 GCS 存储桶和 Dataflow VM 之间读取和写入文件
问题描述
我正在尝试从 GCS 存储桶中读取文件(路径:gs://bucket_name),并将其加载到 Dataflow VM 文件夹(路径/tmp/文件名)。
我还需要将另一个文件从 Dataflow VM 文件夹复制回 GCS 存储桶。
我已经尝试过 apache_beam.io.gcp.gcsio 库,但它似乎不起作用。
任何人都可以对此提出任何建议吗?
解决方案
最好的方法是使用调用GCS Python APIDoFn
的方法触发自定义。DoFn 可以通过将元素发送到. 它可以由 Impulse(仅执行一次)或 PCollection(PCollection 中的每个元素执行)触发。在此处查看下载/上传 blob 和此处查看 GCS Python 客户端库文档。process
DoFn
import apache_beam as beam
from google.cloud import storage
p = beam.Pipeline(...)
impulse = p | beam.Impulse()
class ReadWriteToGcs(beam.DoFn):
def setup(self, e):
self.client = storage.Client()
def process(self, e):
bucket = self.client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
blob.download_to_filename(destination_file_name)
blob.upload_from_filename(source_file_name)
推荐阅读
- assembly - When is the zero flag set?
- arrays - Ajax 响应在数据表中显示为单个字母
- php - Having issue with .htacces file and get method in php
- php - 删除数组foreach mysql中的重复项
- c# - Setting fixed major grid marks independent of data range
- regex - 为什么 Unicode 字符 'MINUS SIGN' (U+2212) 不在正则表达式 unicode 组 \p{Pd} (Dash_Punctuation) 中?
- mysql - How to get all children of the children of the specific parent
- javascript - What is the point of having multiple values after a return statement?
- d3.js - Positioning the text straight in horizontal stacked bar in d3
- java - Spring Data Jpa - How to perform roll back?