airflow-scheduler - 重新处理 Airflow 的历史数据
问题描述
我正在将一些管道迁移到气流中。我希望能够在特定时间范围内针对历史负载运行一些 DAG,并且我正在探索我的选择。注意:我不想重新执行以前的运行(例如过去 10 天),但我希望能够随时根据 last_loaded 时间戳变量(例如 2017-12-09 00:00:00.000000)重新加载数据需要(甚至在创建 DAG 之前)。此变量也用于外部调用 API。
我脑子里总共有4个概念:
当前的 dag 运行实现了通过元数据数据库中的 xcom 表交换此变量。尽管每次我想修改它时,我都必须更新数据类型为 blob 的字段。我什至不确定这是否可能。
将此参数保存在其他位置。易于实施的解决方案,但我不想重新发明轮子。如果气流总是实现某些功能,我想探索它。
气流变量:到目前为止,可能没有最受认可的气流概念,但我确实觉得这就是我想要的。
回填:如果我没记错的话,这是附在以前的处决中的。因此,如果我的 dag 从 12 月开始每天运行,我将无法从 8 月加载数据。
请问有什么建议吗?
解决方案
对于此用例,您可以按如下方式处理 ETL:
- 从变量中读取最后一个 last_loaded 值。
- 在 last_loaded 到 current_timestamp 或 execution_date 或您选择的任何更高边界之间运行 ETL。
- 将较高的边界存储到变量中。
骨架概述可以是:
def set_dag_variables(**kwargs):
new_value = kwargs['var_value']
Variable.set(key=DAG_ID, value=new_value, serialize_json=True)
last_loaded = Varible.get(key=var_name) # don't do this in production. Use macro instead.
your_higher_boundary_param = datetime.now(tz=None)
op1 = YourOperaror(
task_id='op1_task',
params = {"param1":last_loaded,
param2: your_higher_boundary_param }
)
op2 = PythonOperator(
task_id='set_dag_variable_task',
provide_context=True,
python_callable=set_dag_variables,
op_kwargs={'var_value': your_higher_boundary_param}
)
op1 >> op2
注意:这是非常高的水平,细节很重要!
例如,我Varible.get
在运算符/宏范围之外使用这是一种不好的做法。正确的方法是使用宏,但我为示例的提议对其进行了简化。
推荐阅读
- java - 在接收端获取 API 发送方的 IP 地址
- java - NullPointerException 从 FXML 加载场景图,与 Spring 集成
- react-native - React-Native 和 Mapsforge
- android - 使用 Cordova 为本地静态网站提供服务?
- c++ - 匹配特殊字符的字符串
- python - 无法在 Ubuntu 18.04 上安装 Python-Rocksdb
- java - 如何在属性中使用 Lombok 定义 JSON:@JsonPropertyOrder 和 @JsonIgnore (JAVA)
- android - VoIP通话结束后如何防止最小化的应用程序留在前台?
- javascript - 如何根据 javascript 中定义的语言环境验证数字格式
- java - Liferay 7.1 - 如何为 pdf 文件添加链接 html