首页 > 解决方案 > 如何在 Apache Beam Pipeline 中处理大量内存数据以在 Google Dataflow Runner 上运行

问题描述

我有一个简单的以下代码。内存中变量的大小word_to_id约为 50MB。这会导致将管道提交到 Dataflow Runner 时出错。

413请求实体太大

  word_to_id = {tok: idx for idx, tok in enumerate(vocab)}

  def extract_word_ids(tokens):
    return [word_to_id[w] for w in tokens if word_to_id.get(w, None)]

  with beam.pipeline.Pipeline(
    options=get_pipeline_option()) as p:
    lines = p | 'Read' >> beam.io.ReadFromText(path)

    word_ids = (
        lines
        | 'TokenizeLines' >> beam.Map(words)
        | 'IntergerizeTokens' >> beam.Map(extract_word_ids)
    )

请为此提供一个替代解决方案。

标签: google-cloud-dataflowapache-beam

解决方案


您可以将 GCS 存储桶用作文本和变量的源,并将变量用作side input. 您可以将此侧输入用作列表、字典或单例。

在这里,您有一个删除停用词的字数示例,这些停用词存储在 GCS 存储桶中

with beam.Pipeline() as p:

    path = "gs://dataflow-samples/shakespeare/kinglear.txt"
    stopwords_path = "<BUCKET/stopwords>"

    output_path = "<BUCKET>"

    def split_words(text, stopwords):
            words = re.split('\W+', text)
            try:
                words.remove('')
            except:
                pass
            return [x for x in words if x.lower() not in stopwords]

    stopwords_p = (p | "Read Stop Words" >> ReadFromText(stopwords_path)
                     | FlatMap(lambda x: x.split(", "))) 

    text = p | "Read Text" >> ReadFromText(path)

    (text | "Split Words" >> FlatMap(split_words, stopwords=beam.pvalue.AsList(stopwords_p))
          | "Count" >> Count.PerElement()
          | "Write" >> WriteToText(file_path_prefix=output_path, file_name_suffix=".txt"))

推荐阅读