python-3.x - Apache Airflow 2 从 1.1 升级后不执行任务
问题描述
我今天升级到 Airflow 2.0(docker),从那时起,我无法执行任何任务(它们成功失败,但由于以下错误而卡在绿色)。
编辑:我正在编辑它以突出 vanilla docker-compose 文件与我专门进行的更改之间的区别。
airflow_worker |
airflow_worker |
airflow_worker | airflow command error: argument GROUP_OR_COMMAND: celery subcommand works only with CeleryExecutor, your current executor: SequentialExecutor, see help above.
airflow_worker | usage: airflow [-h] GROUP_OR_COMMAND ...
airflow_worker | shows all the options here
airflow_worker | -h, --help show this help message and exit
airflow_worker exited with code 2
我使用了标准的 docker-compose 并做了一些小的改动以满足我的需要。这是我的 docker-compose.yml
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.1}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2:**//xxx:xxx@db/xxx**
AIRFLOW__CELERY__RESULT_BACKEND: **db+postgresql://xxx:xxx@db/xxx**
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: '**xxx**'
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: '**false**'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_AIRFLOW_WWW_USER_USERNAME: airflow
_AIRFLOW_WWW_USER_PASSWORD: airflow
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
**- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./pgadmin-data:/var/lib/pgadmin**
- ./airflow/dags:/opt/airflow/dags
- ./airflow/logs:/opt/airflow/logs
- ./airflow/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-1000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
db:
condition: service_healthy
services:
db:
image: **postgres:latest**
**env_file:
- database.env**
**volumes:
- ./db/:/var/lib/postgresql/data**
healthcheck:
test: [ "CMD", "pg_isready", "-q", "-d", "airflow", "-U", "airflow" ]
interval: 5s
retries: 5
restart: always
redis:
container_name: redis_queue
image: redis:6.2.6-bullseye
command: redis-server --requirepass redispass
**env_file:
- redis.env**
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
container_name: airflow_web
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
container_name: airflow_scheduler
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
flower:
<<: *airflow-common
container_name: flower_web
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
container_name: airflow_worker
command: **celery** worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
**airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully**
pgadmin:
image: dpage/pgadmin4:latest
restart: always
e**nv_file:
- pg_admin.env**
ports:
- "8060:80"
links:
- "db:pgsql-server"
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:0"
volumes:
- .**/airflow**:/sources
volumes:
db:
pgadmin-data:
dags:
logs:
plugins:
该错误每隔几分钟就会显示一次,或者每当气流尝试执行我的任务时显示。
解决方案
迁移到 Airflow 2.2.3 时我遇到了同样的错误:
气流命令错误:参数 GROUP_OR_COMMAND:
airflow worker
命令,已被删除,请使用airflow celery worker
,请参阅上面的帮助。
将入口点/命令更改为以下内容可解决问题:
airflow celery worker
推荐阅读
- javascript - NgRx 参数化选择器显示空结果
- svn - SVN Repository:如何验证复制的数据与源相同?
- r - 如何进行多重关联?
- python - 计算 Pyspark 中的平均用户存在
- java - 如何用 BigDecimals 进行除法
- java - 线程“主”java.lang.NoClassDefFoundError 中的异常:org/apache/pulsar/client/api/PulsarClient
- java - 将直到运行时才知道的对象转换为其类型
- ios - 基于处理器的计算和计算器的正弦值差异
- ldap - DN-String 属性中的 LDAP 过滤器
- kivy - 当根小部件来自 .kv 文件中的实例时,Kivy 中出现错误