celery - 气流调度程序故障
问题描述
我已按照
本教程尝试使用我自己的 DAG 在 localhost 上构建气流集群。当我在配置文件airflow scheduler
中设置后运行时executor = CeleryExecutor
,我收到以下回溯:
回溯(最近一次通话最后):
文件“/home/yurii/Tools/anaconda3/bin/airflow”,第 28 行,在 args.func(args)
调度程序 job.run() 中的文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py”,第 839 行
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第 200 行,运行 self._execute()
_execute self._execute_helper(processor_manager) 中的文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第 1309 行
_execute_helper self.executor.heartbeat() 中的文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第 1441 行
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py”,第 124 行,心跳 self.execute_async(key, command=command, queue=queue)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py”,第 80 行,在 execute_async args=[command], queue=queue)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py”,第 573 行,在 apply_async **dict(self._get_exec_options(), **options)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py”,第 354 行,在 send_task reply_to=reply_to 或 self.oid 中,**options
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py”,第 310 行,在 publish_task **kwargs
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py”,第 172 行,在发布 routing_key,强制,立即,交换,声明)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py”,第 449 行,在 _ensured return fun(*args, **kwargs)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py”,第 188 行,_publish 强制=强制,立即=立即,
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init .py”,第 122 行, basic_publish强制或 False,立即或 False,
TypeError:需要一个整数(获取类型 NoneType)
一些附加信息:
- 我使用 Airflow 1.8.0 以及 Celery 3.1.25 和 RabbitMQ 3.5.7 作为代理和后端,但也尝试使用 Airflow 1.9.0 和 Celery 4.2。
- 带有顺序执行器的气流可以正常工作。
- `airflow test " dag_name " " task_name " " exec_date " 运行成功。
我是 Airflow/Celery/RabbitMQ/SQL 的新手,因此我们将不胜感激!
解决方案
添加到以前的答案。使用 py-amqp 涉及从更改broker_url = amqp://XXXXX
为broker_url = pyamqp://XXXXX
OR
pip uninstall librabbitmq
。
此外,您可能需要将celery_result_backend
变量result_backend
更改为airflow.cfg
. 在最近的版本celery_
中,已删除[celery]
节点中变量的前缀。airflow.cfg
推荐阅读
- javascript - TypeError:更新后无法读取未定义的属性“任何”
- java - 从 CSV 文件读取时出现 NumberFormatException 错误
- spring - Spring MVC Jackson 消息转换器:“冲突的设置器定义”
- php - 在 PHP 脚本中使用 sys_get_temp_dir 将文件保存在临时目录中
- elasticsearch - ELK-Stack:使用 Logstash 仅解析来自 Syslog 的 IP/MAC
- c# - 通过System.js加载JS时如何通过onclick调用函数
- swift - 当字形的矩形 Y 值与前一个矩形显着距离时,不执行`contentOffset`
- python - 基于现有 DataFrame 和条件运算符创建新的 Pandas DataFrame
- css - Laravel 路径导致填充问题
- swift - 如何显示来自弹出视图控制器的数据