首页 > 解决方案 > 根据 dagfile 定义代码每天/定期更新气流 dag

问题描述

有没有办法根据 dagfile 定义代码每天/定期更新气流 dag?例如。更新可能在 dag 定义中使用的日期值。

对于上下文:我有一个气流 dag,它每天从远程数据库获取新的表行并将它们移动到本地数据库中。为了从远程获取最新的行,我们有一个从本地获取最新日期的函数。目前有一个 dag 定义为...

...
def get_latest_date(tablename):
    # get latest import date from local table
    ....

for table in tables: # type list(dict)

    task_1 = BashOperator(
        task_id='task_1_%s' % table["tablename"],
        bash_command='bash %s/task_1.sh %s' % (PROJECT_HOME, table["latest_date"]),
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_2 = BashOperator(
        task_id='task_2_%s' % table["tablename"],
        bash_command='bash %s/task_2.sh' % PROJECT_HOME,
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_1 >> task_2

其中表是字典,其中在代码前面构造的字段之一是给定表的最新日期的字符串代表。在 task_1.sh 脚本中打印假定的最新日期时,发现日期并非每天都更新。需要一种方法来每天重新构建表格列表以获得正确的日期值。

标签: airflow

解决方案


使用下面的代码,您可以latest_date从本地数据库中动态提取每个表的数据,并在BashOperator使用Airflow XCom时使用它。

from airflow import DAG
import airflow
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.bash_operator import BashOperator
import logging

from datetime import datetime, timedelta

args = {
    'owner': 'Airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_dag',
    default_args=args,
    schedule_interval=None,
)


def get_latest_date(**kwargs):
    # get latest import date from local table
    logging.info("Table Name: {0}".format(kwargs['table_name']))
    # below i am doing a datetime.today() for demonstration. In your function, it will be your actual logic to get the latest date from your local DB
    latest_date = (datetime.today() - timedelta(days=kwargs['date_diff'])).strftime('%d-%m-%Y')
    logging.info("Latest Date: {0}".format(latest_date))
    #pus the latest date to the task xcom
    kwargs['ti'].xcom_push(key='latest_date', value=latest_date)

    return latest_date

start_task = DummyOperator(task_id='Start_Task', dag=dag)
end_task = DummyOperator(task_id='End_Task', dag=dag)

# below list will no longer require latest_date entry in each of the table dictionary 
tables_list = [{'tablename': 'table1'}, {'tablename': 'table2'}, {'tablename': 'table3'}, {'tablename': 'table4'}]
# below i am using idx (index) for date difference. I am doing a date difference to get difference latest_date values for different tasks. This is just for demonstration purpose
for idx, table in enumerate(tables_list): # type list(dict)

    get_latest_date_task = ShortCircuitOperator(
        task_id='Get_Latest_Date_In_Table_{0}'.format(table['tablename']),
        provide_context=True,
        python_callable=get_latest_date,
        op_kwargs={
            'table_name': table['tablename'],
            'date_diff': idx
        },
        dag=dag)

    # you can create a variable xcom_str like below and use that xcom_str in BashOperator bash_command or you can directly embed that in bash_command (like I did in task_2 BashOperator)
    xcom_str = "{{ ti.xcom_pull(task_ids='Get_Latest_Date_In_Table_{}', key='latest_date') }}".format(table['tablename'])
    task_1 = BashOperator(
        task_id='task_1_{0}'.format(table['tablename']),
        bash_command='echo "{' + xcom_str + '}"',                
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    task_2 = BashOperator(
        task_id='task_2_{0}'.format(table['tablename']),
        bash_command='echo "{{ ti.xcom_pull("Get_Latest_Date_In_Table_' + table['tablename'] + '", key="latest_date") }}"',
        execution_timeout=timedelta(minutes=30),
        dag=dag)

    start_task >> get_latest_date_task >> task_1 >> task_2 >> end_task

推荐阅读