首页 > 解决方案 > 如何将 ds 派生参数传递给气流中的 hive sql 运算符

问题描述

使用 Airflow,我想运行一个查询,该查询基于 ds 返回给定期间的所有数据。ds 始终是我的结束日期,但开始日期可能会有所不同。例如,它可以是一周或整月。为了处理这个问题,我想使用每月或每周运行的时间表创建不同的 dag。到目前为止,一切都很好。但是,当我想通过 start_dt 时遇到了麻烦

在我的 sql 模板中,我有这个:

where report_dt between '{{ params.report_start_dt }}' AND '{{ds}}' 

在每月的 dag 中,我想这样传递报告 start_dt:

monthly_profile = HiveOperator(
            hql= mycode.sql
            params={**args,
                'report_start_dt': '{{ (execution_date.replace(day=1)).strftime("%Y-%m-%d") }}',
                },
            task_id='monthly_profile',
            )

但是,这失败了,因为我猜该模板不处理嵌套变量。

渲染模板:

where event_dt between {{ (execution_date.replace(day=1)).strftime("%Y-%m-%d") }} AND  '2019-07-31'

我看过这篇文章:Airflow: pass {{ ds }} as param to PostgresOperator 但据我所知,我也在做同样的事情,尽管是针对 hiveoperator。

我做错了什么,我怎样才能达到我需要的,记住我也想用一个简单的 ds 偏移量来做我每周运行的 7 天?

标签: pythonjinja2airflow

解决方案


问题是您正在尝试使用 python 库在模板中进行数据操作。jinja2 无法理解您想要什么,这就是您获得该渲染模板的原因。

您可以通过使用 Hive SQL 函数来解决此问题,只需将参数传递给这些函数。因此,在月初,您可以使用:

date_add(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'),
         1 - day(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd')) 
        )

在气流中:

monthly_profile = HiveOperator(
            hql= mycode.sql
            task_id='monthly_profile',
            )

在 SQL 文件中

    where report_dt between date_add(day('{{ execution_date }}','%Y-%m-%d'),
               1 - day('{{ execution_date }}','%Y-%m-%d'))                       
          AND '{{ds}}' 

推荐阅读