首页 > 解决方案 > 如何使用 DataflowPythonOperator 通过 Apache Airflow 运行 Apache Beam 数据管道

问题描述

我已经在我的 Airflow DAG 中实现了 DataflowPythonOperator,并且在执行时出现以下错误:

2019-06-12 07:04:27,988] {{models.py:1595}} INFO - Executing <Task(DataFlowPythonOperator): task_run_pipeline> on 2019-05-01T04:10:00+00:00
[2019-06-12 07:04:27,989] {{base_task_runner.py:118}} INFO - Running: ['bash', '-c', 'airflow run example_with_dataflow task_run_pipeline 2019-05-01T04:10:00+00:00 --job_id 57 --raw -sd DAGS_FOLDER/example_with_dataflow.py --cfg_path /tmp/tmp1fjmyili']
[2019-06-12 07:04:32,437] {{base_task_runner.py:101}} INFO - Job 57: Subtask task_run_pipeline [2019-06-12 07:04:32,436] {{settings.py:174}} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2019-06-12 07:04:35,107] {{base_task_runner.py:101}} INFO - Job 57: Subtask task_run_pipeline [2019-06-12 07:04:35,105] {{__init__.py:51}} INFO - Using executor LocalExecutor
[2019-06-12 07:04:39,188] {{base_task_runner.py:101}} INFO - Job 57: Subtask task_run_pipeline [2019-06-12 07:04:39,186] {{models.py:271}} INFO - Filling up the DagBag from /usr/local/airflow/dags/example_with_dataflow.py
[2019-06-12 07:04:39,861] {{base_task_runner.py:101}} INFO - Job 57: Subtask task_run_pipeline [2019-06-12 07:04:39,860] {{cli.py:484}} INFO - Running <TaskInstance: example_with_dataflow.task_run_pipeline 2019-05-01T04:10:00+00:00 [running]> on host 10b352a0858c
[2019-06-12 07:04:40,086] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:40,086] {{discovery.py:272}} INFO - URL being requested: GET https://www.googleapis.com/discovery/v1/apis/storage/v1/rest
[2019-06-12 07:04:40,895] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:40,894] {{discovery.py:873}} INFO - URL being requested: GET https://www.googleapis.com/storage/v1/b/example-airflow-test/o/example%2Fprocess_details.py?alt=media
[2019-06-12 07:04:42,079] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:42,079] {{gcp_dataflow_hook.py:120}} INFO - Running command: python /tmp/dataflow58bcc900-process_details.py --runner=DataflowRunner --project=data_example --zone=us-central1 --temp_location=gs://path/temp --staging_location=gs://path/staging --labels=airflow-version=v1-10-1 --job_name=task-run-pipeline-59ed310c --region=us-central1
[2019-06-12 07:04:42,130] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:42,130] {{gcp_dataflow_hook.py:151}} INFO - Start waiting for DataFlow process to complete.
[2019-06-12 07:04:42,391] {{logging_mixin.py:95}} INFO - [2019-06-12 07:04:42,391] {{gcp_dataflow_hook.py:132}} WARNING - b'Traceback (most recent call last):\n  File "/tmp/dataflow58bcc900-process_details.py", line 7, in <module>\n    import apache_beam as beam\nModuleNotFoundError: No module named \'apache_beam\''
[2019-06-12 07:04:42,392] {{models.py:1760}} ERROR - DataFlow failed with return code 1

看来安装依赖项有问题。如果我单独运行 Beam 数据管道,那么它使用 GCP 上的 DataflowRunner 可以正常运行。为了运行 Apache 气流网络服务器,我使用了 Puckel Docker-compose 文件。您的见解/建议将非常感激!谢谢 :)

标签: pythondockergoogle-cloud-dataflowairflowapache-beam

解决方案


您可以创建数据流模板,然后可以从 Airflow/Cloud composer 触发这些模板。这些依赖相关的问题不会产生任何问题。


推荐阅读