首页 > 解决方案 > Airflow Jinja 模板不适用于自定义运算符

问题描述

我正在尝试制作如下的自定义 Airflow 运算符:

class NewCheckOperator(BaseOperator):

    __mapper_args__ = {
        'polymorphic_identity': 'NewCheckOperator'
    }
    template_fields = ('sql1', 'sql2')  # type: Iterable[str]
    template_ext = ('.hql', '.sql',)  # type: Iterable[str]
    ui_color = '#91E6F2'

    ratio_formulas = {
        'max_over_min': lambda cur, ref: float(max(cur, ref)) / min(cur, ref),
        'relative_diff': lambda cur, ref: float(abs(cur - ref)) / ref,
    }

    @apply_defaults
    def __init__(
        self,
        table: str,
        template_time: str,
        metrics_thresholds: Dict[str, int],
        random_param: int, 
        ratio_formula: Optional[str] = 'max_over_min',
        ignore_zero: Optional[bool] = True,
        conn_id: Optional[str] = None,
        *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        #some logic here 

        self.sql1 = " '{{ ds }}' "
        self.sql2 = "'{{ macros.ds_add(ds, " + str(self.random_param) + ") }}'"

    def execute(self, context=None):
        hook = self.get_db_hook()
        self.log.info('Using ratio formula: %s', self.ratio_formula)
        self.log.info('Executing SQL check: %s', self.sql2)
        row2 = hook.get_first(self.sql2)
        self.log.info('Executing SQL check: %s', self.sql1)
        row1 = hook.get_first(self.sql1)

        if not row2:
            raise AirflowException("The query {} returned None".format(self.sql2))
        if not row1:
            raise AirflowException("The query {} returned None".format(self.sql1))

       #other logic 

    def get_db_hook(self):
        return BaseHook.get_hook(conn_id=self.conn_id)

class BigQueryNewCheckOperator(NewCheckOperator): 
    template_fields = ('table', 'gcp_conn_id', )

    @apply_defaults
    def __init__(self,
                 table: str,
                 metrics_thresholds: dict,
                 random_param: int,
                 date_filter_column: str = 'ds',                 
                 gcp_conn_id: str = 'google_cloud_default',
                 bigquery_conn_id: Optional[str] = None,
                 use_legacy_sql: bool = True,
                 *args,
                 **kwargs) -> None:
        super().__init__(
            table=table, metrics_thresholds=metrics_thresholds,
            date_filter_column=date_filter_column, days_back=days_back,
            *args, **kwargs)

        if bigquery_conn_id:
            warnings.warn(
                "The bigquery_conn_id parameter has been deprecated. You should pass "
                "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3)
            gcp_conn_id = bigquery_conn_id

        self.gcp_conn_id = gcp_conn_id
        self.use_legacy_sql = use_legacy_sql

    def get_db_hook(self):
        return BigQueryHook(bigquery_conn_id=self.gcp_conn_id,
                            use_legacy_sql=self.use_legacy_sql)

但是由于某种原因,尽管明确设置了and as ,但由于某种原因,模板NewCheckOperator不起作用(我的意思是它们被视为像 一样的str文字) 。这是在 之后紧密建模的,所以我不确定为什么气流宏不起作用。这里的任何帮助将不胜感激!" '{{ ds }}' "sql1sql2template_fieldsIntervalCheckOperatorBigQueryIntervalCheckOperator

标签: google-bigqueryairflow

解决方案


我认为是因为以下原因:

self.sql1 = " '{{ ds }}' "
self.sql2 = "'{{ macros.ds_add(ds, " + str(self.random_param) + ") }}'"

您应该只在 JINJA 模板中使用这些宏,或者您可以将这些宏作为 DAG 中的参数传递给 Operator。

Operator 周围可能有一些装饰器/包装器,它们在输入您的 Operator 之前获取模板化字段并呈现它们。您实际上是在获取呈现的参数,然后将它们更改为文字字符串。


推荐阅读