首页 > 解决方案 > Airflow: how can i automate such that a query runs for every date specified rather than hard coding?

问题描述

I am new to airflow so apoliges if this has been asked somewhere.

I have a query i run in hive that is partitioned on year month so e.g. 202001.

how can i run a query which specifies a variable for different values within the query in airflow? eg. taking this example

from airflow import DAG
from airflow.operators.mysql_operator import MySqlOperator

default_arg = {'owner': 'airflow', 'start_date': '2020-02-28'}

dag = DAG('simple-mysql-dag',
          default_args=default_arg,
          schedule_interval='00 11 2 * *')

mysql_task = MySqlOperator(dag=dag,
                           mysql_conn_id='mysql_default', 
                           task_id='mysql_task'
                           sql='<path>/sample_sql.sql',
                       params={'test_user_id': -99})

where my sample_sql.hql looks like:

ALTER TABLE sample_df DROP IF EXISTS
PARTITION (
    cpd_ym = ${ym}
) PURGE;

INSERT INTO sample_df
PARTITION (
    cpd_ym = ${ym}
)
SELECT
  * 

from sourcedf 

; 

ANALYZE TABLE sample_df
PARTITION (
    cpd_ym = ${ym}
)
COMPUTE STATISTICS;

ANALYZE TABLE sample_df
PARTITION (
    cpd_ym = ${ym}
)
COMPUTE STATISTICS FOR COLUMNS;

i want to run the above for different values of ym using airflow e.g. between 202001 and 202110 how can i do this?

标签: hiveairflow

解决方案


我有点困惑,因为您在询问 Hive,但您展示了MySqlOperator. 无论如何,假设 sql/hql 参数是模板化的,您可以execution_date直接在查询中使用。因此,您可以提取用于分区值的年份和月份。

例子:

mysql_task = MySqlOperator(
    dag=dag,
    task_id='mysql_task',
    sql="""SELECT {{ execution_date.strftime('%y%m') }}""",
)

在此处输入图像描述

所以在你的sample_sql.hql它将是:

ALTER TABLE sample_df DROP IF EXISTS
PARTITION (
    cpd_ym = {{ execution_date.strftime('%y%m') }}
) PURGE;

您提到您是 Airflow 的新手,因此请确保您知道它是什么execution_date以及它是如何计算的(如果您没有检查这个答案)。您也可以对其他宏进行字符串操作。选择适合您需要的宏(///execution_date等...)。prev_execution_datenext_execution_date


推荐阅读