首页 > 解决方案 > 用于包含文件名元数据的大量文件 (1M+) 的 Apache Beam/Dataflow 管道

问题描述

我一直在为 GCS 存储桶中存储 1M+ 文件的用例编写 Apache Beam/Dataflow 管道;Bigquery 行输出还需要每个文件的路径。每个输入文件都是一个单行 json 文件。

这是我当前的管道片段:

    input_file_path = 'gs://' + BUCKET + '/**'

    with beam.Pipeline(options=options) as p:
        (p | 'Reading input file' >> beam.io.ReadFromTextWithFilename(input_file_path)
         | 'Converting from json to dict' >> beam.ParDo(JSONtoDict())
         | 'Write entries into Bigquery' >> beam.io.WriteToBigQuery(
             table_spec,
             schema=table_schema,
             write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
         )


所以我使用ReadFromTextWithFilename的是 Beam Python SDK。

在测试时,它按预期适用于约 1000 个 json 文件。但是,我不确定它是否适用于我计划运行此管道(通过 Google Dataflow)的大型数据集中的 >1M 文件。

在 Java SDK 中,我遇到了withHintMatchesManyFiles。我在 Python SDK 中找不到等价物,我应该切换到 Java SDK 来实现这个功能吗?

考虑到 1M+ 的输入文件,是否有另一种获取每个字符串行的文件名的方法?

标签: google-cloud-dataflowapache-beamapache-beam-io

解决方案


不知道你在犹豫什么。您能解释一下是什么让您认为它不适用于大量文件吗?

您可以使用 MatchFiles 来匹配和读取多个文件。

  readable_files = (
      pipeline
      | fileio.MatchFiles('hdfs://path/to/*.txt')
      | fileio.ReadMatches()
      | beam.Reshuffle())
  files_and_contents = (
      readable_files
      | beam.Map(lambda x: (x.metadata.path, x.read_utf8())))

参考:

  1. https://beam.apache.org/documentation/patterns/file-processing/
  2. https://beam.apache.org/releases/pydoc/2.12.0/apache_beam.io.fileio.html

推荐阅读