首页 > 解决方案 > Airflow 的 BeamRunPythonPipelineOperator 无法安装 py_requirements

问题描述

FROM apache/airflow:2.1.1-python3.8

USER root
RUN apt-get update \
  && apt-get install -y --no-install-recommends \
         build-essential gcc git \
  && apt-get autoremove -yqq --purge \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*

USER airflow
apache-airflow[amazon,docker,slack,google,postgres,kubernetes]==2.1.0
google-cloud-bigquery[bqstorage,pandas]==2.13.1
apache-airflow-providers-apache-beam~=3.0.0
tfdv_operator = BeamRunPythonPipelineOperator(
    task_id="run_tfdv_on_dataflow",
    py_file="gs://my-datasets/tfdv/beam_pipeline_tfdv.py",     
    pipeline_options={         
        "train_stats_output_path": f"gs://my-datasets/tfdv/train_stats.tfrecord",
        "test_stats_output_path": f"gs://my-datasets/tfdv/test_stats.tfrecord",
    },
    py_options=[],
    py_requirements=[
        "apache-beam[gcp]==2.31.0",
        "pyarrow==2.0.0",
        "tensorflow-data-validation==1.1.0",
        "tfx-bsl==1.1.1",
    ],
    py_interpreter="python3.8",
    py_system_site_packages=False,
    runner="DataflowRunner",
    dataflow_config=DataflowConfiguration(
        job_name="{{task.task_id}}",
        project_id="my-research",
        location="asia-northeast3",
    ),
    default_pipeline_options={
        "temp_location": "gs://my-datasets/tfdv/dataflow_job_tmp/",
        "staging_location": "gs://my-datasets/tfdv/dataflow_job_staging/", 
    },
)

运行 dag 但任务失败。日志:

*** Log file does not exist: /opt/airflow/logs/tfdv_dag/run_tfdv_on_dataflow/2021-08-15T06:30:51.833077+00:00/1.log
*** Fetching from: http://airflow-worker-0.airflow-worker.airflow-playground.svc.cluster.local:8793/log/tfdv_dag/run_tfdv_on_dataflow/2021-08-15T06:30:51.833077+00:00/1.log

[2021-08-15 06:30:53,088] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [queued]>
[2021-08-15 06:30:53,103] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [queued]>
[2021-08-15 06:30:53,103] {taskinstance.py:1067} INFO - 
--------------------------------------------------------------------------------
[2021-08-15 06:30:53,103] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2021-08-15 06:30:53,104] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2021-08-15 06:30:53,131] {taskinstance.py:1087} INFO - Executing <Task(BeamRunPythonPipelineOperator): run_tfdv_on_dataflow> on 2021-08-15T06:30:51.833077+00:00
[2021-08-15 06:30:53,136] {standard_task_runner.py:52} INFO - Started process 2304 to run task
[2021-08-15 06:30:53,140] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'tfdv_dag', 'run_tfdv_on_dataflow', '2021-08-15T06:30:51.833077+00:00', '--job-id', '679', '--pool', 'default_pool', '--raw', '--subdir', '/opt/***/dags/repo/dags/tfdv_dag.py', '--cfg-path', '/tmp/tmp5wikr_ja', '--error-file', '/tmp/tmpsq7cctia']
[2021-08-15 06:30:53,141] {standard_task_runner.py:77} INFO - Job 679: Subtask run_tfdv_on_dataflow
[2021-08-15 06:30:53,290] {logging_mixin.py:104} INFO - Running <TaskInstance: tfdv_dag.run_tfdv_on_dataflow 2021-08-15T06:30:51.833077+00:00 [running]> on host ***-worker-0.***-worker.***-playground.svc.cluster.local
[2021-08-15 06:30:53,402] {taskinstance.py:1280} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=tfdv_dag
AIRFLOW_CTX_TASK_ID=run_tfdv_on_dataflow
AIRFLOW_CTX_EXECUTION_DATE=2021-08-15T06:30:51.833077+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-08-15T06:30:51.833077+00:00
[2021-08-15 06:30:53,779] {gcs.py:310} INFO - File downloaded to /tmp/tmptqjahx3vbeam_pipeline_tfdv.py
[2021-08-15 06:30:53,779] {process_utils.py:135} INFO - Executing cmd: virtualenv /tmp/apache-beam-venvborjpiov --python=python3.8
[2021-08-15 06:30:53,787] {process_utils.py:139} INFO - Output:
[2021-08-15 06:30:54,085] {process_utils.py:143} INFO - created virtual environment CPython3.8.10.final.0-64 in 177ms
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO -   creator CPython3Posix(dest=/tmp/apache-beam-venvborjpiov, clear=False, no_vcs_ignore=False, global=False)
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO -   seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/home/***/.local/share/virtualenv)
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO -     added seed packages: pip==21.1.3, setuptools==57.2.0, wheel==0.36.2
[2021-08-15 06:30:54,086] {process_utils.py:143} INFO -   activators BashActivator,CShellActivator,FishActivator,PowerShellActivator,PythonActivator,XonshActivator
[2021-08-15 06:30:54,099] {process_utils.py:135} INFO - Executing cmd: /tmp/apache-beam-venvborjpiov/bin/pip install 'apache-beam[gcp]==2.31.0' pyarrow==2.0.0 tensorflow-data-validation==1.1.0 tfx-bsl==1.1.1
[2021-08-15 06:30:54,107] {process_utils.py:139} INFO - Output:
[2021-08-15 06:30:54,824] {process_utils.py:143} INFO - ERROR: Can not perform a '--user' install. User site-packages are not visible in this virtualenv.
[2021-08-15 06:30:54,993] {process_utils.py:143} INFO - WARNING: You are using pip version 21.1.3; however, version 21.2.4 is available.
[2021-08-15 06:30:54,993] {process_utils.py:143} INFO - You should consider upgrading via the '/tmp/apache-beam-venvborjpiov/bin/python -m pip install --upgrade pip' command.
[2021-08-15 06:30:55,056] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1341, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/operators/beam.py", line 242, in execute
    self.beam_hook.start_python_pipeline(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/apache/beam/hooks/beam.py", line 243, in start_python_pipeline
    py_interpreter = prepare_virtualenv(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/python_virtualenv.py", line 98, in prepare_virtualenv
    execute_in_subprocess(pip_cmd)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/process_utils.py", line 147, in execute_in_subprocess
    raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/apache-beam-venvborjpiov/bin/pip', 'install', 'apache-beam[gcp]==2.31.0', 'pyarrow==2.0.0', 'tensorflow-data-validation==1.1.0', 'tfx-bsl==1.1.1']' returned non-zero exit status 1.
[2021-08-15 06:30:55,058] {taskinstance.py:1524} INFO - Marking task as FAILED. dag_id=tfdv_dag, task_id=run_tfdv_on_dataflow, execution_date=20210815T063051, start_date=20210815T063053, end_date=20210815T063055
[2021-08-15 06:30:55,113] {local_task_job.py:151} INFO - Task exited with return code 1

看起来它与高度相关,ERROR: Can not perform a '--user' install. User site-packages are not visible in this virtualenv但不知道如何处理问题。

需要注意的一件事是,当我在本地机器(Mac OS X)上运行完全相同的 dag 而没有任何代码更改(通过airflow dags test my_dag 2021-01-01)时,它运行良好。

我该如何处理这个问题?

标签: airflowgoogle-cloud-dataflowapache-beam

解决方案


将选项更改py_system_site_packagesTrue。它应该使site-packages虚拟环境可见并删除此错误。有关详细说明,请参阅文档


推荐阅读