首页 > 解决方案 > 气流重新安排错误:依赖“任务实例状态”通过:假

问题描述

我有一个定制的传感器,如下所示。这个想法是一个 dag 可以有不同的任务,可以从不同的时间开始,并利用内置的气流重新调度系统。

class MySensor(BaseSensorOperator):
    def __init__(self, *, start_time, tz, ...)
        super().__init__(**kwargs)
        self._start_time = start_time
        self._tz = tz
    
    @provide_session
    def execute(self, context, session: Session=None):
        dt_start = datetime.combine(context['next_execution_date'].date(), self._start_time)
        dt_start = dt_start.replace(tzinfo=self._tz)
        
        if datetime.now().timestamp() < dt_start.timestamp():
            dt_reschedule = datetime.utcnow().replace(tzinfo=UTC) 
            dt_reschedule += timedelta(seconds=dt_start.timestamp()-datetime.now().timestamp())
            raise AirflowRescheduleException(dt_reschedule)
        
        return super().execute(context)

在 dag 中,我有以下内容。但是,我注意到当模式为默认模式“poke”时,传感器将无法正常工作。

with DAG( schedule='0 10 * * 1-5', ... ) as dag:
    task1 = MySensor(start_time=time(14,0), mode='poke')
    task2 = MySensor(start_time=time(16,0), mode='reschedule') 
    ... ...

从日志中,我可以看到以下内容:

{taskinstance.py:1141} INFO - Rescheduling task, mark task as UP_FOR_RESCHEDULE
[5s later]
{local_task_job.py:102} INFO -  Task exited with return code 0
[14s later]
{taskinstance.py:687} DEBUG - <TaskInstance: mydag.mytask execution_date [failed]> dependency 'Task Instance State' PASSED: False, Task in in the 'failed' state which is not a valid state for execution. The task must be cleared in order to be run.
{taskinstance.py:664} INFO - Dependencies not met for <TaskInstance ... [failed]> ...

为什么重新安排不使用 mode='poke'?调度程序(?)何时将任务实例的状态从“up_for_reschedule”翻转为“失败”?在不同时间启动每个任务/传感器的更好方法?该传感器是 FileSensor 的改进版本,可以检查一堆文件或带有模式的文件。我目前的选择是使用 mode='reschedule' 强制执行每项任务

气流版本 1.10.12

标签: airflow

解决方案


推荐阅读