首页 > 解决方案 > 通过 GCS 存储桶文件夹中的文件过滤并使用 Dataflow 删除 0 字节文件

问题描述

我目前正在尝试删除 Google Cloud Storage 存储桶中所有 0 字节的文件。我希望能够使用 apache 梁和将在谷歌云项目上运行的数据流运行器来做到这一点。我现在拥有的是这个(我已经隐藏了一些细节<***>):

import apache_beam as beam
import apache_beam.io.gcp.gcsfilesystem as gcs
from apache_beam.options.pipeline_options import PipelineOptions

class DetectEmpty(beam.DoFn):
    def process(self, file_path):
        if gfs.size(file_path) == 0:
            yield file_path

def run(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', default=<***>, help='<***>')

    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = '<***>'
    google_cloud_options.job_name = '<***>'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    gfs = gcs.GCSFileSystem(pipeline_options)
    p = beam.Pipeline(options=pipeline_options)

    images = p | 'read directory' >> ReadFromText(known_args.input)
    empty_images = images | 'discover empty files' >> beam.ParDo(DetectEmpty())

    p.run()

我的一些问题是:

标签: pythongoogle-cloud-storageapache-beam

解决方案


您不需要实际读取文件以检测空文件,您可以直接使用 FileSystem 对象检查文件大小并根据需要删除。match() 函数返回的 FileMetadata 对象包括文件的大小。

就像是

class DeleteEmpty(beam.DoFn):
  def __init__(self, gfs):
    self.gfs = gfs

  def process(self, file_metadata):
    if file_metadata.size_in_bytes == 0:
      gfs.delete([file_metadata.path])

files = p | 'Filenames' >> beam.Create(gfs.match([<directory glob pattern>]).metadata_list)
          | 'Reshuffle' >> beam.Reshuffle() # this allows the downstream code to be parallelized after the Create
          | 'Delete empty files' >> beam.ParDo(DeleteEmpty(gfs))

GCS 并没有真正的文件夹。它们只是在使用 UI 或 gsutil 时添加的一种便利。当文件夹中没有对象时,该文件夹就不存在了。请参阅https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork


推荐阅读