首页 > 解决方案 > Airflow:创建一个 Operator,它根据 hook 结果返回一个传感器或一个 DummyOperator

问题描述

我想知道是否有一种方法可以构建一个执行 pub/sub 挂钩(或另一个挂钩)的 Operator,如果该对象已经存在,它将失败。如果这个钩子返回一个异常,那么我们操作一个传感器,如果没有,我们继续 DAG。

我试图在考虑到以下元代码的情况下实现它,但还没有做到。

class CheckIfExistOperator(BaseOperator):
    def execute(self, context):
        try:
            PubSubHook(
                ...
            ).create_subscription(
                ...
                fail_if_exists=True
            )
            return DummyOperator(
                task_id='subscriber_already_exists',
                ...
                )
        except PubSubException as e:
            return PubSubPullSensor(
               ...
            )

有什么建议么?谢谢 :)

标签: google-cloud-pubsubgoogle-cloud-composerairflow

解决方案


看来您需要某种分支。在下面的示例中,我使用 aBranchPythonOperator执行一个函数,该函数尝试创建一个新订阅并返回一个字符串,通知任务是成功还是失败。之后,我使用了两个PythonOperators对应于 BranchPythonOperator 返回的字符串的 task_id,并定义了start_op成功失败任务之间的依赖关系

from airflow.contrib.hooks import gcp_pubsub_hook
from airflow.operators import python_operator
from airflow import models

def print_context1(ds, **kwargs):
    return 'THE TASK SUCCEDED'

def print_context2(ds, **kwargs):
    return 'THE TASK FAILED'

def start_function(ds, **kwargs):
    try:        
        response = gcp_pubsub_hook.PubSubHook().create_subscription(
        topic_project="my-project",
        topic="beam",
        subscription="beam_sub2",
        subscription_project=None, 
        ack_deadline_secs=10,
        fail_if_exists=True)
        return "succeeded"
    except gcp_pubsub_hook.PubSubException:
        return "failed"


default_dag_args = {
...
}

with models.DAG(
        'pubsub_airflow',
        default_args=default_dag_args) as dag:

    start_op = python_operator.BranchPythonOperator(
        task_id='start',
        provide_context=True,
        python_callable=start_function
    )

    success = python_operator.PythonOperator(
        task_id='succeeded',
        provide_context=True,
        python_callable=print_context1
    )

    fail = python_operator.PythonOperator(
        task_id='failed',
        provide_context=True,
        python_callable=print_context2
    )

    start_op >> [success,fail]

在您的情况下,您可以使用此代码作为基础,并PythonOperators用您的 DummyOperator 和传感器替换两者。


推荐阅读