google-bigquery - Airflow Jinja 模板不适用于自定义运算符
问题描述
我正在尝试制作如下的自定义 Airflow 运算符:
class NewCheckOperator(BaseOperator):
__mapper_args__ = {
'polymorphic_identity': 'NewCheckOperator'
}
template_fields = ('sql1', 'sql2') # type: Iterable[str]
template_ext = ('.hql', '.sql',) # type: Iterable[str]
ui_color = '#91E6F2'
ratio_formulas = {
'max_over_min': lambda cur, ref: float(max(cur, ref)) / min(cur, ref),
'relative_diff': lambda cur, ref: float(abs(cur - ref)) / ref,
}
@apply_defaults
def __init__(
self,
table: str,
template_time: str,
metrics_thresholds: Dict[str, int],
random_param: int,
ratio_formula: Optional[str] = 'max_over_min',
ignore_zero: Optional[bool] = True,
conn_id: Optional[str] = None,
*args, **kwargs
):
super().__init__(*args, **kwargs)
#some logic here
self.sql1 = " '{{ ds }}' "
self.sql2 = "'{{ macros.ds_add(ds, " + str(self.random_param) + ") }}'"
def execute(self, context=None):
hook = self.get_db_hook()
self.log.info('Using ratio formula: %s', self.ratio_formula)
self.log.info('Executing SQL check: %s', self.sql2)
row2 = hook.get_first(self.sql2)
self.log.info('Executing SQL check: %s', self.sql1)
row1 = hook.get_first(self.sql1)
if not row2:
raise AirflowException("The query {} returned None".format(self.sql2))
if not row1:
raise AirflowException("The query {} returned None".format(self.sql1))
#other logic
def get_db_hook(self):
return BaseHook.get_hook(conn_id=self.conn_id)
class BigQueryNewCheckOperator(NewCheckOperator):
template_fields = ('table', 'gcp_conn_id', )
@apply_defaults
def __init__(self,
table: str,
metrics_thresholds: dict,
random_param: int,
date_filter_column: str = 'ds',
gcp_conn_id: str = 'google_cloud_default',
bigquery_conn_id: Optional[str] = None,
use_legacy_sql: bool = True,
*args,
**kwargs) -> None:
super().__init__(
table=table, metrics_thresholds=metrics_thresholds,
date_filter_column=date_filter_column, days_back=days_back,
*args, **kwargs)
if bigquery_conn_id:
warnings.warn(
"The bigquery_conn_id parameter has been deprecated. You should pass "
"the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
gcp_conn_id = bigquery_conn_id
self.gcp_conn_id = gcp_conn_id
self.use_legacy_sql = use_legacy_sql
def get_db_hook(self):
return BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
use_legacy_sql=self.use_legacy_sql)
但是由于某种原因,尽管明确设置了and as ,但由于某种原因,模板NewCheckOperator
不起作用(我的意思是它们被视为像 一样的str
文字) 。这是在 之后紧密建模的,所以我不确定为什么气流宏不起作用。这里的任何帮助将不胜感激!" '{{ ds }}' "
sql1
sql2
template_fields
IntervalCheckOperator
BigQueryIntervalCheckOperator
解决方案
我认为是因为以下原因:
self.sql1 = " '{{ ds }}' "
self.sql2 = "'{{ macros.ds_add(ds, " + str(self.random_param) + ") }}'"
您应该只在 JINJA 模板中使用这些宏,或者您可以将这些宏作为 DAG 中的参数传递给 Operator。
Operator 周围可能有一些装饰器/包装器,它们在输入您的 Operator 之前获取模板化字段并呈现它们。您实际上是在获取呈现的参数,然后将它们更改为文字字符串。
推荐阅读
- reactjs - Is there a way to emulate the run frequency of constructor code using the React Hooks API?
- node.js - 使用自定义 POST api 插入两个表
- javascript - 我可以将 JSON Schema 中的字段声明为“JSON Schema”吗?
- sql-server - 如何在 SQL 中将数据框另存为表
- couchbase - Couchbase N1QL 值解码功能
- laravel - 在 database.php 中从 mysql 导入数据库信息
- hash - Set 在检查值时如何在内部实际工作?
- c# - 实现asp.net mvc站点资源全球化的最快方法是什么?
- python - 使用 beautifulsoup 从“值”属性中提取文本
- python - 如何在递归函数中检查文件结尾?