首页 > 解决方案 > 将列表转换为 PCollection

问题描述

我目前有一个DoFn查看存储桶并查看该存储桶和 dir 前缀中的所有文件。这DoFn将返回一个列表而不是一个PCollection. 我如何将此列表转换为PCollection可以由 使用的DoFn ConvertFileNames

  # List all the files within a subdir 
  class ListBlobs(beam.DoFn):
    def start_bundle(self):
      self.storage_client = storage.Client()

    def process(self, prefix):
      bucket = self.storage_client.bucket('xxx')
      return list(self.bucket.list_blobs(prefix=prefix))

  # Convert Blobs into filenames as patterns
  class ConvertFileNames(beam.DoFn):
    def process(self, blob):
      return 'gs://' + blob.bucket.name + blob.name

标签: python-3.xgoogle-cloud-storageapache-beamdataflow

解决方案


梁文档中所述,Beam DoFn 的处理方法返回一个可迭代的元素以放置到下游 PCollection 中。所以,在你的例子中,如果我有一个前缀的 PCollection,调用它prefix_pcoll,那么我可以写

blobs_pcoll = prefix_pcoll | beam.ParDo(ListBlobs())

并将blobs_pcoll包含具有此前缀的 blob 列表(即list(self.bucket.list_blobs(prefix=prefix))所有前缀的连接)。然后你可以写

converted = blobs_pcoll | beam.ParDo(ConvertFileNames())

你也可以写

converted = blobs_pcoll | beam.Map(
    lambda blob: 'gs://' + blob.bucket.name + blob.name)

您可能还想查看apache_beam.io.fileio.MatchAll


推荐阅读