首页 > 解决方案 > Apache Beam:ReadFromText 与 ReadAllFromText

问题描述

我正在运行一个 Apache Beam 管道,从 Google Cloud Storage 读取文本文件,对这些文件执行一些解析并将解析后的数据写入 Bigquery。

为了简短起见,忽略此处的解析和 google_cloud_options,我的代码如下:(apache-beam 2.5.0 带有 GCP 附加组件和 Dataflow 作为运行器)

p = Pipeline(options=options)

lines = p | 'read from file' >> 
beam.io.ReadFromText('some_gcs_bucket_path*')  |  \
    'parse xml to dict' >> beam.ParDo(
        beam.io.WriteToBigQuery(
            'my_table',
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
    p.run()

这运行良好并成功地将相关数据附加到我的 Bigquery 表中以获取少量输入文件。但是,当我将输入文件的数量增加到 +- 800k 时,出现错误:

“BoundedSource.split() 操作返回的 BoundedSource 对象的总大小大于允许的限制。”

我发现排除 apache 光束管道导入错误 [BoundedSource 对象大于允许的限制]建议使用 ReadAllFromText 而不是 ReadFromText。
但是,当我换出时,出现以下错误:

Traceback (most recent call last):
  File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 240, in <module>
    xmltobigquery.run_dataflow()
  File "/Users/richardtbenade/Repos/de_020/main_isolated.py", line 220, in run_dataflow
    'parse xml to dict' >> beam.ParDo(XmlToDictFn(), job_spec=self.job_spec) | \
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 831, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 488, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/textio.py", line 470, in expand
    return pvalue | 'ReadAllFiles' >> self._read_all_files
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
    label or transform.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/io/filebasedsource.py", line 416, in expand
    | 'ReadRange' >> ParDo(_ReadRange(self._source_from_file)))
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 454, in apply
    label or transform.label)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 464, in apply
    return self.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 568, in expand
    | 'RemoveRandomKeys' >> Map(lambda t: t[1]))
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 109, in __or__
    return self.pipeline.apply(ptransform, self)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pipeline.py", line 500, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 187, in apply
    return m(transform, input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply_PTransform
    return transform.expand(input)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/util.py", line 494, in expand
    windowing_saved = pcoll.windowing
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
    self.producer.inputs)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
    return inputs[0].windowing
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/pvalue.py", line 130, in windowing
    self.producer.inputs)
  File "/Users/richardtbenade/virtualenvs/de_020/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 443, in get_windowing
    return inputs[0].windowing
AttributeError: 'PBegin' object has no attribute 'windowing'. 

有什么建议么?

标签: google-cloud-platformapache-beamdataflow

解决方案


我面临着同样的问题。正如 Richardt 提到beam.Create的,必须明确调用。另一个挑战是如何将此模式与模板参数一起使用,因为仅支持文档beam.Create中描述的内存数据。

Google Cloud 支持在这种情况下帮助了我,我想与您分享解决方案。诀窍是使用虚拟字符串创建管道,然后使用映射 lambda 在运行时读取输入:

class AggregateOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            help='Path of the files to read from')
        parser.add_value_provider_argument(
            '--output',
            help='Output files to write results to')

def run():
    logging.info('Starting main function')

    pipeline_options = PipelineOptions()
    pipeline = beam.Pipeline(options=pipeline_options)
    options = pipeline_options.view_as(AggregateOptions)

    steps = (
            pipeline
            | 'Create' >> beam.Create(['Start'])  # workaround to kickstart the pipeline
            | 'Read Input Parameter' >> beam.Map(lambda x: options.input.get())  # get the real input param
            | 'Read Data' >> beam.io.ReadAllFromText()
            | # ... other steps

希望这个答案是有帮助的。


推荐阅读