airflow - Airflow + Celery + Flower - 任务名称不是 Python 运算符的任务 ID
问题描述
我目前正在使用 CeleryExecutor 提交 DAG,并在 AWS 上的同一容器中运行 Web 服务器、调度程序和工作程序。当我提交 DAG 以运行并且它们的任务在 Redis 队列中(使用 Elasticache)时,Celery 工作人员执行任务,但名称只是airflow.executors.celery_executor.execute_command
. 我如何让这些成为参数?
目前这些是重要的部分airflow.cfg
:
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor
executor = CeleryExecutor
[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
在我们的 DAG 文件中:
dag = DAG(
'foo_dag',
schedule_interval='0 2-23 * * *',
catchup=False,
default_args=default_args
)
t1 = PythonOperator(
task_id="bar_task",
provide_context=True,
python_callable=bar_task,
op_kwargs={"emr_cluster_id": emr_cluster_id},
task_concurrency=1,
dag=foo_dag
)
解决方案
推荐阅读
- java - 从Java中具有不同数据类型的单个方法返回多个变量
- java - java - 用分隔符理解字节数组
- java - “单例是每个进程和每个类加载器”是什么意思?
- c# - 使用 DayOfWeek 提高挑战效率
- android - 使用 soong 的 AOSP 构建失败
- python - Python http requests.exceptions.ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))
- python - 我无法在 python 3.7.7 中用 pygame 画一个圆圈
- python - 为什么值不设置为带有 loc 的熊猫数据框中的列
- angular - 仅从 algolia 获取 8 条记录
- python - 为什么在初始化新进程时出现 NameError?