首页 > 解决方案 > 重新处理 Airflow 的历史数据

问题描述

我正在将一些管道迁移到气流中。我希望能够在特定时间范围内针对历史负载运行一些 DAG,并且我正在探索我的选择。注意:我不想重新执行以前的运行(例如过去 10 天),但我希望能够随时根据 last_loaded 时间戳变量(例如 2017-12-09 00:00:00.000000)重新加载数据需要(甚至在创建 DAG 之前)。此变量也用于外部调用 API。

我脑子里总共有4个概念:

  1. 当前的 dag 运行实现了通过元数据数据库中的 xcom 表交换此变量。尽管每次我想修改它时,我都必须更新数据类型为 blob 的字段。我什至不确定这是否可能。

  2. 将此参数保存在其他位置。易于实施的解决方案,但我不想重新发明轮子。如果气流总是实现某些功能,我想探索它。

  3. 气流变量:到目前为止,可能没有最受认可的气流概念,但我确实觉得这就是我想要的。

  4. 回填:如果我没记错的话,这是附在以前的处决中的。因此,如果我的 dag 从 12 月开始每天运行,我将无法从 8 月加载数据。

请问有什么建议吗?

标签: airflow-schedulerairflow

解决方案


对于此用例,您可以按如下方式处理 ETL:

  1. 从变量中读取最后一个 last_loaded 值。
  2. 在 last_loaded 到 current_timestamp 或 execution_date 或您选择的任何更高边界之间运行 ETL。
  3. 将较高的边界存储到变量中。

骨架概述可以是:

def set_dag_variables(**kwargs):
    new_value = kwargs['var_value']
    Variable.set(key=DAG_ID, value=new_value, serialize_json=True)


last_loaded = Varible.get(key=var_name) # don't do this in production. Use macro instead.
your_higher_boundary_param = datetime.now(tz=None)

op1 = YourOperaror(
    task_id='op1_task',
    params = {"param1":last_loaded,
             param2: your_higher_boundary_param }
)
op2 = PythonOperator(
    task_id='set_dag_variable_task',
    provide_context=True,
    python_callable=set_dag_variables,
    op_kwargs={'var_value': your_higher_boundary_param}
    )
op1 >> op2

注意:这是非常高的水平,细节很重要!

例如,我Varible.get在运算符/宏范围之外使用这是一种不好的做法。正确的方法是使用宏,但我为示例的提议对其进行了简化。


推荐阅读