首页 > 解决方案 > 气流DockerOperator超时

问题描述

我是气流新手。我正在尝试通过 Airflow 运行容器,但出现超时错误:

[2021-07-21 07:02:06,176] {docker.py:231} INFO - Starting docker container from image python:3.9.2-slim
[2021-07-21 07:03:06,171] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 426, in _make_request
    six.raise_from(e, None)
  File "<string>", line 3, in raise_from
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 421, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/local/lib/python3.6/http/client.py", line 1379, in getresponse
    response.begin()
  File "/usr/local/lib/python3.6/http/client.py", line 311, in begin
    version, status, reason = self._read_status()
  File "/usr/local/lib/python3.6/http/client.py", line 272, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/local/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 727, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/util/retry.py", line 410, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/packages/six.py", line 735, in reraise
    raise value
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 677, in urlopen
    chunked=chunked,
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 428, in _make_request
    self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
  File "/home/airflow/.local/lib/python3.6/site-packages/urllib3/connectionpool.py", line 336, in _raise_timeout
    self, url, "Read timed out. (read timeout=%s)" % timeout_value
urllib3.exceptions.ReadTimeoutError: HTTPConnectionPool(host='host.docker.internal', port=2375): Read timed out. (read timeout=60)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 319, in execute
    return self._run_image()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/providers/docker/operators/docker.py", line 258, in _run_image
    tty=self.tty,
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 430, in create_container
    return self.create_container_from_config(config, name)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/container.py", line 440, in create_container_from_config
    res = self._post_json(u, data=config, params=params)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 296, in _post_json
    return self._post(url, data=json.dumps(data2), **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/utils/decorators.py", line 46, in inner
    return f(self, *args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/docker/api/client.py", line 233, in _post
    return self.post(url, **self._set_request_timeout(kwargs))
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 590, in post
    return self.request('POST', url, data=data, json=json, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 542, in request
    resp = self.send(prep, **send_kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/sessions.py", line 655, in send
    r = adapter.send(request, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/requests/adapters.py", line 529, in send
    raise ReadTimeout(e, request=request)
requests.exceptions.ReadTimeout: HTTPConnectionPool(host='host.docker.internal', port=2375): Read timed out. (read timeout=60)
[2021-07-21 07:03:06,179] {taskinstance.py:1551} INFO - Marking task as UP_FOR_RETRY. dag_id=etl_in_ch, task_id=etl_in_ch, execution_date=20210721T070203, start_date=20210721T070205, end_date=20210721T070306
[2021-07-21 07:03:06,215] {local_task_job.py:149} INFO - Task exited with return code 1

我有 Mac 系统,并配置了 docker 套接字,如下所示:https ://github.com/puckel/docker-airflow/issues/543#issuecomment-741842728

我的气流代码是:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.operators.dummy import DummyOperator

default_args = {
'owner'                 : 'airflow',
'description'           : 'Extract data from different sources into CH and train model with it',
'depend_on_past'        : False,
'start_date'            : datetime(2021, 7, 19),
'email_on_failure'      : False,
'email_on_retry'        : False,
'retries'               : 1,
'retry_delay'           : timedelta(minutes=5)
}

with DAG('etl_in_ch', default_args=default_args, schedule_interval="00 23 * * *", catchup=False) as dag:
    start_dag = DummyOperator(
        task_id='start_dag'
        )

    end_dag = DummyOperator(
        task_id='end_dag'
        )

    t1 = DockerOperator(
        task_id='etl_in_ch',
        image='python:3.9.2-slim',
        container_name='etl_in_ch',
        api_version='auto',
        auto_remove=True,
        command="apt-get update && apt-get install -y cron && apt-get install -y libxml2 libxslt-dev wget bzip2 gcc \
                && pip install --no-cache-dir  --upgrade pip \
                && pip install --no-cache-dir poetry==1.1.5 \
                && poetry config virtualenvs.create false\
                && poetry install --no-interaction --no-ansi \
                && chmod +x /src/__main__.py \
                && python __main__.py",
        docker_url="tcp://host.docker.internal:2375",
        network_mode="bridge",
        environment={"PYTHONDONTWRITEBYTECODE": 1, "PYTHONUNBUFFERED": 1},
        working_dir="/usr/src/copy_data",
        mounts=['./CH_ETL/src:/usr/src/copy_data', './pyproject.toml:pyproject.toml'],
        xcom_all=True
    )


    start_dag >> t1

    t1 >> end_dag

我看到我可能需要增加 docker 超时时间,但我不知道确切的位置,实际上我已经尝试过 - 在我的机器上,在气流工作者内部,在 bobrik/socat docker 内部。没有帮助。

标签: pythondockerairflowdockeroperator

解决方案


Puckel 图像评论中的评论显示了一种复杂的解决方案。

如果您有要在容器中使用的本地 docker,我认为更好的解决方案是切换到 Apache 气流https://airflow.apache.org/docs/docker-stack/index.html的官方图像和使用 Docker-in-Docker 解决方案,将 docker 套接字映射到容器内部。

如果您有远程 docker 引擎,最好明确指定远程 docker 引擎 URL。

您需要确保获得正确的权限(例如参见https://forums.docker.com/t/docker-daemon-access-within-docker-as-non-root-user-permission-denied-while- Trying-to-connect-to-docker-daemon-socket/94181)或以 root 用户身份运行 Airflow(我认为您可以从 2.0.2 版本开始执行此操作)。

请注意,最近发布的 Airflow 2.0.0 ( https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/index.html )的 docker 提供程序存在一个错误,通常会阻止您通过 Docker-in-Docker 或远程 URL 运行 DockerOperator。您可以使用以前版本的提供程序或等待 2.1.0 中的修复https://github.com/apache/airflow/pull/16932


推荐阅读