首页 > 解决方案 > Airflow + Celery + Flower - 任务名称不是 Python 运算符的任务 ID

问题描述

我目前正在使用 CeleryExecutor 提交 DAG,并在 AWS 上的同一容器中运行 Web 服务器、调度程序和工作程序。当我提交 DAG 以运行并且它们的任务在 Redis 队列中(使用 Elasticache)时,Celery 工作人员执行任务,但名称只是airflow.executors.celery_executor.execute_command. 我如何让这些成为参数?在此处输入图像描述

目前这些是重要的部分airflow.cfg

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

在我们的 DAG 文件中:

dag = DAG(
    'foo_dag',
    schedule_interval='0 2-23 * * *',
    catchup=False,
    default_args=default_args
)

t1 = PythonOperator(
    task_id="bar_task",
    provide_context=True,
    python_callable=bar_task,
    op_kwargs={"emr_cluster_id": emr_cluster_id},
    task_concurrency=1,
    dag=foo_dag
)

标签: airflowairflow-schedulerflower

解决方案


推荐阅读