首页 > 解决方案 > 使用不同的日期参数重复气流 DAG 以进行数据迁移

问题描述

对于数据迁移,我创建了一个 DAG,它最终在所有具有所需逻辑的任务之后将数据插入到迁移表中。

DAG 有一个类似于下面的 sql,它最初提取数据并提供给其他任务:

sql=" select col_names from tables where created_on >=date1 and created_on <=date2"

对于每个 DAG 运行,我在上面的 sql 中手动更改 date1 和 date2 并启动数据迁移(由于数据块很重,截至目前日期范围长度为 1 周)。

我只想自动化这个日期更改过程,例如,如果我给出日期间隔,在第一个 DAG 运行后,第二次运行开始,依此类推,直到结束日期间隔。

到目前为止我已经研究过,我得到的一个解决方案是气流中的动态 DAGS。但问题是它创建了多个 DAG 文件实例,而且它也很难调试和维护。

有没有办法通过更改日期参数重复 DAG,以便我不再需要手动更改日期。

标签: python-3.xairflow

解决方案


我有类似的请求,这里是我如何访问稍后可在 SQL 中用于回填的日期。

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from datetime import datetime, timedelta

# Following are defaults which can be overridden later on
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 1),
    'end_date': datetime(2020, 8, 3),
    'retries': 0,
}

dag = DAG('helloWorld_v1', default_args=default_args, catchup=True, schedule_interval='0 1 * * *')

def print_dag_run_date(**kwargs):
    print(kwargs)
    execution_date = kwargs['ds']
    prev_execution_date = kwargs['prev_ds']
    return (execution_date, prev_execution_date)


# t1, t2 are examples of tasks created using operators

bash = BashOperator(
    task_id='bash',
    depends_on_past=True,
    bash_command='echo "Hello World from Task 1"',
    dag=dag)

py = PythonOperator(
    task_id='py',
    depends_on_past=True,
    python_callable=print_dag_run_date,
    provide_context=True,
    dag=dag)


py.set_upstream(bash)

推荐阅读