google-cloud-dataflow - 如何在 Apache Beam Pipeline 中处理大量内存数据以在 Google Dataflow Runner 上运行
问题描述
我有一个简单的以下代码。内存中变量的大小word_to_id
约为 50MB。这会导致将管道提交到 Dataflow Runner 时出错。
413请求实体太大
word_to_id = {tok: idx for idx, tok in enumerate(vocab)}
def extract_word_ids(tokens):
return [word_to_id[w] for w in tokens if word_to_id.get(w, None)]
with beam.pipeline.Pipeline(
options=get_pipeline_option()) as p:
lines = p | 'Read' >> beam.io.ReadFromText(path)
word_ids = (
lines
| 'TokenizeLines' >> beam.Map(words)
| 'IntergerizeTokens' >> beam.Map(extract_word_ids)
)
请为此提供一个替代解决方案。
解决方案
您可以将 GCS 存储桶用作文本和变量的源,并将变量用作side input
. 您可以将此侧输入用作列表、字典或单例。
在这里,您有一个删除停用词的字数示例,这些停用词存储在 GCS 存储桶中
with beam.Pipeline() as p:
path = "gs://dataflow-samples/shakespeare/kinglear.txt"
stopwords_path = "<BUCKET/stopwords>"
output_path = "<BUCKET>"
def split_words(text, stopwords):
words = re.split('\W+', text)
try:
words.remove('')
except:
pass
return [x for x in words if x.lower() not in stopwords]
stopwords_p = (p | "Read Stop Words" >> ReadFromText(stopwords_path)
| FlatMap(lambda x: x.split(", ")))
text = p | "Read Text" >> ReadFromText(path)
(text | "Split Words" >> FlatMap(split_words, stopwords=beam.pvalue.AsList(stopwords_p))
| "Count" >> Count.PerElement()
| "Write" >> WriteToText(file_path_prefix=output_path, file_name_suffix=".txt"))
推荐阅读
- nginx - 禁用 nginx 的 index.html
- google-bigquery - 从其他 Google Cloud 服务流式传输数据时,是否可以修复失败的 BigQuery 插入?
- sendfile - FileChannel.transferTo(据说是零拷贝)没有带来任何性能提升
- python - twython获取api错误码
- c# - 带有 OnKeyDown 和其他控件的 Winforms
- javascript - 用对象方法深度克隆一个类对象?
- python - Pandas 移动列,但添加非移动列为空的新行
- go - dep init 上的“不在已知的 GOPATH/src 中”错误
- javascript - 如何使用材料下拉菜单获取所选值?
- actions-on-google - TRANSACTION_REQUIREMENTS_CHECK 不起作用