airflow - 气流重新安排错误:依赖“任务实例状态”通过:假
问题描述
我有一个定制的传感器,如下所示。这个想法是一个 dag 可以有不同的任务,可以从不同的时间开始,并利用内置的气流重新调度系统。
class MySensor(BaseSensorOperator):
def __init__(self, *, start_time, tz, ...)
super().__init__(**kwargs)
self._start_time = start_time
self._tz = tz
@provide_session
def execute(self, context, session: Session=None):
dt_start = datetime.combine(context['next_execution_date'].date(), self._start_time)
dt_start = dt_start.replace(tzinfo=self._tz)
if datetime.now().timestamp() < dt_start.timestamp():
dt_reschedule = datetime.utcnow().replace(tzinfo=UTC)
dt_reschedule += timedelta(seconds=dt_start.timestamp()-datetime.now().timestamp())
raise AirflowRescheduleException(dt_reschedule)
return super().execute(context)
在 dag 中,我有以下内容。但是,我注意到当模式为默认模式“poke”时,传感器将无法正常工作。
with DAG( schedule='0 10 * * 1-5', ... ) as dag:
task1 = MySensor(start_time=time(14,0), mode='poke')
task2 = MySensor(start_time=time(16,0), mode='reschedule')
... ...
从日志中,我可以看到以下内容:
{taskinstance.py:1141} INFO - Rescheduling task, mark task as UP_FOR_RESCHEDULE
[5s later]
{local_task_job.py:102} INFO - Task exited with return code 0
[14s later]
{taskinstance.py:687} DEBUG - <TaskInstance: mydag.mytask execution_date [failed]> dependency 'Task Instance State' PASSED: False, Task in in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run.
{taskinstance.py:664} INFO - Dependencies not met for <TaskInstance ... [failed]> ...
为什么重新安排不使用 mode='poke'?调度程序(?)何时将任务实例的状态从“up_for_reschedule”翻转为“失败”?在不同时间启动每个任务/传感器的更好方法?该传感器是 FileSensor 的改进版本,可以检查一堆文件或带有模式的文件。我目前的选择是使用 mode='reschedule' 强制执行每项任务
气流版本 1.10.12
解决方案
推荐阅读
- stm32 - STM32F4077 未启动 PLL(未设置 PLLRDY 位)
- importerror - PyAudio导入错误:ImportError:DLL加载失败:找不到指定的模块
- android-studio - 我在 android studio 上运行默认的颤振代码,这个错误出现了关于许可协议的内容
- javascript - 当另一个元素在视图中时更改一个元素的 CSS
- java - 使用在 LINUX 中运行的 windows 换行符生成 xml 签名
- javascript - 使用 apache 之类的 Web 服务器在线导出我的本地主机服务器(节点 js)
- python-3.x - 运行 opencv 示例时出现“功能/功能未实现”错误
- c# - foreach 循环仅获取最后一条记录到 gridview c# asp.net
- node-opcua - 使用“AddIn”引用在命名空间中添加一个对象
- tensorflow - 在 IPU 模型上运行 TensorFlow 程序会引发“非法指令(核心转储)”错误