airflow - 气流2.2时间表,总是有错误:时间表未注册
问题描述
我按照这个例子
- 创建示例时间表 py 文件,并将其放入 $Home/airflow/plugins
- 创建示例 dag 文件,并将其放入 $Home/airflow/dags
重新启动调度程序和网络服务器后,我得到DAG import error
. 在 Web UI 中,最后一行详细的错误消息:
airflow.exceptions.SerializationError: Failed to serialize DAG 'example_timetable_dag2': Timetable class 'AfterWorkdayTimetable' is not registered
但是如果我运行airflow plugins
,我可以看到时间表在名称和来源列表中。
如何修复此错误?
详情plugins/AfterWorkdayTimetable.py
:
from datetime import timedelta
from typing import Optional
from pendulum import Date, DateTime, Time, timezone
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
UTC = timezone("UTC")
class AfterWorkdayTimetable(Timetable):
def infer_data_interval(self, run_after: DateTime) -> DataInterval:
weekday = run_after.weekday()
if weekday in (0, 6): # Monday and Sunday -- interval is last Friday.
days_since_friday = (run_after.weekday() - 4) % 7
delta = timedelta(days=days_since_friday)
else: # Otherwise the interval is yesterday.
delta = timedelta(days=1)
start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC)
return DataInterval(start=start, end=(start + timedelta(days=1)))
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
last_start_weekday = last_start.weekday()
if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow.
delta = timedelta(days=1)
else: # Last run on Friday -- skip to next Monday.
delta = timedelta(days=(7 - last_start_weekday))
next_start = DateTime.combine((last_start + delta).date(), Time.min).replace(tzinfo=UTC)
else: # This is the first ever run on the regular schedule.
next_start = restriction.earliest
if next_start is None: # No start_date. Don't schedule.
return None
if not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
elif next_start.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_day = next_start.date() + timedelta(days=1)
next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC)
next_start_weekday = next_start.weekday()
if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday.
delta = timedelta(days=(7 - next_start_weekday))
next_start = next_start + delta
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]
详情dags/test_afterwork_timetable.py
:
import datetime
from airflow import DAG
from AfterWorkdayTimetable import AfterWorkdayTimetable
from airflow.operators.dummy import DummyOperator
with DAG(
dag_id="example_workday_timetable",
start_date=datetime.datetime(2021, 1, 1),
timetable=AfterWorkdayTimetable(),
tags=["example", "timetable"],
) as dag:
DummyOperator(task_id="run_this")
如果我运行airflow plugins
:
name | source
==================================+==========================================
workday_timetable_plugin | $PLUGINS_FOLDER/AfterWorkdayTimetable.py
解决方案
我有类似的问题。
您需要添加__init__.py
文件,或者您应该尝试这个来调试您的问题:
获取所有插件管理器对象:
from airflow import plugins_manager
plugins_manager.initialize_timetables_plugins()
plugins_manager.timetable_classes
我得到了这个结果:{'quarterly.QuarterlyTimetable': <class 'quarterly.QuarterlyTimetable'>}
将您的结果与异常消息进行比较。如果timetable_classes
字典有不同的插件名称,您应该更改插件文件路径。
你也可以在 DAG python 文件中尝试这个:
from AfterWorkdayTimetable import AfterWorkdayTimetable
from airflow import plugins_manager
print(plugins_manager.as_importable_string(AfterWorkdayTimetable))
timetable_classes
这将帮助您找到气流在通过字典搜索时尝试使用的名称。
推荐阅读
- activiti - activiti中的setVariable和setVariableLocal有什么区别?
- python-3.x - 将变量传递给 Google Cloud 函数
- php - 必发 - 比赛结束后的结果
- discord.js - 语音频道用户限制不和谐js
- mysql - MySQL 存储过程案例语句语法错误 - 续
- javascript - 通知侦听器服务 Android - Ionic
- meteor - 在流星中使用 TripleDes 加密
- android - 平台 android NDK 不支持 ABI [armeabi, mips]
- c++ - 霍夫变换法后得到一圈
- javascript - Jquery Datatables请求未知参数问题