python - 如何将 ds 派生参数传递给气流中的 hive sql 运算符
问题描述
使用 Airflow,我想运行一个查询,该查询基于 ds 返回给定期间的所有数据。ds 始终是我的结束日期,但开始日期可能会有所不同。例如,它可以是一周或整月。为了处理这个问题,我想使用每月或每周运行的时间表创建不同的 dag。到目前为止,一切都很好。但是,当我想通过 start_dt 时遇到了麻烦
在我的 sql 模板中,我有这个:
where report_dt between '{{ params.report_start_dt }}' AND '{{ds}}'
在每月的 dag 中,我想这样传递报告 start_dt:
monthly_profile = HiveOperator(
hql= mycode.sql
params={**args,
'report_start_dt': '{{ (execution_date.replace(day=1)).strftime("%Y-%m-%d") }}',
},
task_id='monthly_profile',
)
但是,这失败了,因为我猜该模板不处理嵌套变量。
渲染模板:
where event_dt between {{ (execution_date.replace(day=1)).strftime("%Y-%m-%d") }} AND '2019-07-31'
我看过这篇文章:Airflow: pass {{ ds }} as param to PostgresOperator 但据我所知,我也在做同样的事情,尽管是针对 hiveoperator。
我做错了什么,我怎样才能达到我需要的,记住我也想用一个简单的 ds 偏移量来做我每周运行的 7 天?
解决方案
问题是您正在尝试使用 python 库在模板中进行数据操作。jinja2 无法理解您想要什么,这就是您获得该渲染模板的原因。
您可以通过使用 Hive SQL 函数来解决此问题,只需将参数传递给这些函数。因此,在月初,您可以使用:
date_add(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'),
1 - day(FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'))
)
在气流中:
monthly_profile = HiveOperator(
hql= mycode.sql
task_id='monthly_profile',
)
在 SQL 文件中
where report_dt between date_add(day('{{ execution_date }}','%Y-%m-%d'),
1 - day('{{ execution_date }}','%Y-%m-%d'))
AND '{{ds}}'
推荐阅读
- sql - 在一个查询中使用两个 Select 语句来返回所需的数据集
- python - Pandas:撤消累积(例如累积总和)
- apache-nifi - NiFi十进制数格式
- postgresql - 为什么我在“@”处或附近出现语法错误
- xml - Azure 消息传递项目指南(函数/API/XML 网关)
- java - 使用 ServiceLoader 获取提供多个服务的服务提供者类的相同实例
- java - E/UncaughtException: android.content.res.Resources$NotFoundException: 资源 ID #0x7f0800a8
- node.js - nodejs在天蓝色的活动目录中找到用户
- python - 打印所有文本,只需要一个
- javascript - 从 windows.showModalDialog 迁移到 window.open