首页 > 解决方案 > Dataflow Bigquery-Bigquery 管道在较小的数据上执行,但不是在大型生产数据集上执行

问题描述

这里是 Dataflow 的一点点新手,但已成功创建了一个运行良好的管道。

管道从 BigQuery 读取查询,应用 ParDo(NLP 功能),然后将数据写入新的 BigQuery 表。

我要处理的数据集大约是 500GB,有 46M 记录。

当我使用相同数据的子集(大约 30 万条记录)尝试此操作时,它工作得很好并且速度很快,见下文: 在此处输入图像描述

当我尝试使用完整的数据集运行它时,它开始非常快,但随后逐渐减少并最终失败。此时作业失败并添加了大约 900k 元素,大约 6-7GB,然后元素数量实际上开始减少。

在此处输入图像描述

我正在使用 250 个工人和一个 n1-highmem-6 机器类型

在工作日志中,我得到了其中的一些(大约 10 个):

Info
2021-04-22 06:29:38.236 EDTRefreshing due to a 401 (attempt 1/2)

这是最后的警告之一:

2021-04-22 06:29:32.392 EDTS08:[85]: GetArticles/Read+[85]: GetArticles/_PassThroughThenCleanup/ParDo(PassThrough)/ParDo(PassThrough)+[85]: ExtractEntity+[85]: WriteToBigQuery/BigQueryBatchFileLoads/RewindowIntoGlobal+[85]: WriteToBigQuery/BigQueryBatchFileLoads/AppendDestination+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/IdentityWorkaround+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Write+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(_ShardDestinations)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Write failed.

在执行细节中有多个:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

我假设这些来自数据集中较大的文本,可能需要一段时间来处理,所以稍微处理一下这些项目并开始下一个项目。

其中还有一些:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

所有这一切对我来说有点令人困惑,而且并不完全直观——即使它工作时的服务很棒。

我正在从 Jupyter 笔记本执行作业(不使用交互式运行器,只是执行脚本)。

主要管道如下:

p = beam.Pipeline()

#Create a collection from Bigquery
articles = p | "GetArticles" >> beam.io.ReadFromBigQuery(query='SELECT id,uuid, company_id_id, title, full_text, FROM `MY TABLE` ', gcs_location=dataflow_gcs_location, project='my_project',use_standard_sql=True)

#Extract entities with NLP
entities = articles | "ExtractEntity" >> beam.ParDo(EntityExtraction()) 

#Write to bigquery 
entities | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('myproject:dataset.table', schema = schema,custom_gcs_temp_location=dataflow_gcs_location, create_disposition="CREATE_IF_NEEDED",write_disposition="WRITE_APPEND")  ```

我究竟做错了什么?这是内存问题吗?我不应该像这样读写 BigQuery 而是输出到文件并从中创建表吗?希望得到一些帮助,很抱歉这篇文章很长,希望提供尽可能多的背景信息。

标签: pythongoogle-bigquerygoogle-cloud-dataflowapache-beamdataflow

解决方案


我发现 Dataflow 对于像这样的大型 NLP 批处理作业不是很好。我解决这个问题的方法是将较大的作业分成可以可靠运行的较小的作业。因此,如果您可以可靠地运行 100K 文档,则只需运行 500 个作业。


推荐阅读