首页 > 解决方案 > 气流调度程序故障

问题描述

我已按照 教程尝试使用我自己的 DAG 在 localhost 上构建气流集群。当我在配置文件airflow scheduler中设置后运行时executor = CeleryExecutor,我收到以下回溯:

回溯(最近一次通话最后):

文件“/home/yurii/Tools/anaconda3/bin/airflow”,第 28 行,在 args.func(args)

调度程序 job.run() 中的文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py”,第 839 行

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第 200 行,运行 self._execute()

_execute self._execute_helper(processor_manager) 中的文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第 1309 行

_execute_helper self.executor.heartbeat() 中的文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第 1441 行

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py”,第 124 行,心跳 self.execute_async(key, command=command, queue=queue)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py”,第 80 行,在 execute_async args=[command], queue=queue)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py”,第 573 行,在 apply_async **dict(self._get_exec_options(), **options)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py”,第 354 行,在 send_task reply_to=reply_to 或 self.oid 中,**options

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py”,第 310 行,在 publish_task **kwargs

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py”,第 172 行,在发布 routing_key,强制,立即,交换,声明)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py”,第 449 行,在 _ensured return fun(*args, **kwargs)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py”,第 188 行,_publish 强制=强制,立即=立即,

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init .py”,第 122 行, basic_publish强制或 False,立即或 False,

TypeError:需要一个整数(获取类型 NoneType)

一些附加信息:

我是 Airflow/Celery/RabbitMQ/SQL 的新手,因此我们将不胜感激!

标签: celeryairflow-scheduler

解决方案


添加到以前的答案。使用 py-amqp 涉及从更改broker_url = amqp://XXXXXbroker_url = pyamqp://XXXXXOR pip uninstall librabbitmq

此外,您可能需要将celery_result_backend变量result_backend更改为airflow.cfg. 在最近的版本celery_中,已删除[celery]节点中变量的前缀。airflow.cfg


推荐阅读