python - 有没有办法指定应该在每个 Airflow 操作符开始时运行的代码块?
问题描述
我们有一些设置代码,我们希望确保它们在每个操作符开始时运行。
我们现在能做的最好的事情是创建一个所有其他操作符都继承自的基本操作符,并从该操作符的预执行函数中调用设置代码。但是,我们无法强制开发人员编写的所有未来运算符都必须从该运算符继承,因此人们仍然可以编写不运行重要设置代码的运算符。
有没有办法制作一个对所有操作员都通用的预执行函数?
解决方案
我总结一下你有两个选择。选项 1 更实用、更技术化,需要补丁,但有很多好处。选项 2 是实际扩展您的第一个建议。
选项1
找到安装气流 BaseOperator 的文件源。复制它并修改文件,以便pre_execute
方法的定义具有您的功能。用于diff
生成diff
可以与文件一起应用的patch
文件。调用如果OVERRIDE-apply-library-preexecute.diff
。
然后,您需要在安装或配置步骤中加载此差异文件。正常安装气流。然后你就可以使用该patch
工具了。类似的命令patch -d <PATH> -p1 < OVERRIDE-apply-library-preexecute.diff
这具有最少的代码量,会很明显地失败(随着事情的变化),并且您不必进行单元测试。并且您可以保证每个 Operator 都会执行您的 pre_execute 方法。
选项 2
我建议您正在寻找正确的方向。
我建议将您的代码简单地实现为 Mixin,提供在您自己的代码中使用的所有运算符,并使用单元测试来确保 DAG 使用直接继承该 Mixin 的任务。
您可以按照以下方式定义您的 Mixin 类:
import logging
from airflow.models import BaseOperator
class MyPreExecuteMixin(BaseOperator):
def pre_execute(self, context):
logging.info("Global Pre Execute")
这将迫使您提供要在自己的库中使用的每个 Operator 的版本,就像这样......
from airflow.operators.dummy_operator import DummyOperator as _DummyOperator
from mylibrary.models import MyPreExecuteMixin
class DummyOperator(_DummyOperator, MyPreExecuteMixin):
pass
然后你必须有一个测试套件......
from dags.mydag import dag
def test_all_pre_execute():
non_pre_execute_tasks = [task for task in dag.tasks if type(task) != MyPreExecuteMixin]
assert not non_pre_execute_tasks
推荐阅读
- svg - 使用 SkiaSharp.Svg 从 svg 转换为 png 时添加了一些噪音
- javascript - 如何检测用户是否从当前位置滚动了 100px
- python - Pytest:在 python 中模拟/猴子修补内置 input() 和 print() 函数
- perl - 寻找棕枝主日
- javascript - JavaScript:正则表达式在文本中查找数据时间和序列
- python - 将数据从 OpenCV 加载到 CNN
- cas - 无法通过 CAS 进行身份验证
- vba - VBA - 如果列标题匹配,则跨多个工作表应用过滤器
- arrays - 在数组中选择中间值 - Swift
- c++ - C++:默认构造函数实现