首页 > 解决方案 > 有没有办法指定应该在每个 Airflow 操作符开始时运行的代码块?

问题描述

我们有一些设置代码,我们希望确保它们在每个操作符开始时运行。

我们现在能做的最好的事情是创建一个所有其他操作符都继承自的基本操作符,并从该操作符的预执行函数中调用设置代码。但是,我们无法强制开发人员编写的所有未来运算符都必须从该运算符继承,因此人们仍然可以编写不运行重要设置代码的运算符。

有没有办法制作一个对所有操作员都通用的预执行函数?

标签: pythonairflowinitializer

解决方案


我总结一下你有两个选择。选项 1 更实用、更技术化,需要补丁,但有很多好处。选项 2 是实际扩展您的第一个建议。


选项1

找到安装气流 BaseOperator 的文件源。复制它并修改文件,以便pre_execute方法的定义具有您的功能。用于diff生成diff可以与文件一起应用的patch文件。调用如果OVERRIDE-apply-library-preexecute.diff

然后,您需要在安装或配置步骤中加载此差异文件。正常安装气流。然后你就可以使用该patch工具了。类似的命令patch -d <PATH> -p1 < OVERRIDE-apply-library-preexecute.diff

这具有最少的代码量,会很明显地失败(随着事情的变化),并且您不必进行单元测试。并且您可以保证每个 Operator 都会执行您的 pre_execute 方法。


选项 2

我建议您正在寻找正确的方向。

我建议将您的代码简单地实现为 Mixin,提供在您自己的代码中使用的所有运算符,并使用单元测试来确保 DAG 使用直接继承该 Mixin 的任务。

您可以按照以下方式定义您的 Mixin 类:

import logging

from airflow.models import BaseOperator

class MyPreExecuteMixin(BaseOperator):
    def pre_execute(self, context):
        logging.info("Global Pre Execute")

这将迫使您提供要在自己的库中使用的每个 Operator 的版本,就像这样......

from airflow.operators.dummy_operator import DummyOperator as _DummyOperator
from mylibrary.models import MyPreExecuteMixin

class DummyOperator(_DummyOperator, MyPreExecuteMixin):
    pass

然后你必须有一个测试套件......

from dags.mydag import dag

def test_all_pre_execute():
    non_pre_execute_tasks = [task for task in dag.tasks if type(task) != MyPreExecuteMixin]
    assert not non_pre_execute_tasks

推荐阅读