google-cloud-pubsub - Airflow:创建一个 Operator,它根据 hook 结果返回一个传感器或一个 DummyOperator
问题描述
我想知道是否有一种方法可以构建一个执行 pub/sub 挂钩(或另一个挂钩)的 Operator,如果该对象已经存在,它将失败。如果这个钩子返回一个异常,那么我们操作一个传感器,如果没有,我们继续 DAG。
我试图在考虑到以下元代码的情况下实现它,但还没有做到。
class CheckIfExistOperator(BaseOperator):
def execute(self, context):
try:
PubSubHook(
...
).create_subscription(
...
fail_if_exists=True
)
return DummyOperator(
task_id='subscriber_already_exists',
...
)
except PubSubException as e:
return PubSubPullSensor(
...
)
有什么建议么?谢谢 :)
解决方案
看来您需要某种分支。在下面的示例中,我使用 aBranchPythonOperator
执行一个函数,该函数尝试创建一个新订阅并返回一个字符串,通知任务是成功还是失败。之后,我使用了两个PythonOperators
对应于 BranchPythonOperator 返回的字符串的 task_id,并定义了start_op与成功和失败任务之间的依赖关系
from airflow.contrib.hooks import gcp_pubsub_hook
from airflow.operators import python_operator
from airflow import models
def print_context1(ds, **kwargs):
return 'THE TASK SUCCEDED'
def print_context2(ds, **kwargs):
return 'THE TASK FAILED'
def start_function(ds, **kwargs):
try:
response = gcp_pubsub_hook.PubSubHook().create_subscription(
topic_project="my-project",
topic="beam",
subscription="beam_sub2",
subscription_project=None,
ack_deadline_secs=10,
fail_if_exists=True)
return "succeeded"
except gcp_pubsub_hook.PubSubException:
return "failed"
default_dag_args = {
...
}
with models.DAG(
'pubsub_airflow',
default_args=default_dag_args) as dag:
start_op = python_operator.BranchPythonOperator(
task_id='start',
provide_context=True,
python_callable=start_function
)
success = python_operator.PythonOperator(
task_id='succeeded',
provide_context=True,
python_callable=print_context1
)
fail = python_operator.PythonOperator(
task_id='failed',
provide_context=True,
python_callable=print_context2
)
start_op >> [success,fail]
在您的情况下,您可以使用此代码作为基础,并PythonOperators
用您的 DummyOperator 和传感器替换两者。
推荐阅读
- lisp - 一个实体中的对象反应器“复制”和“修改”出错 - 需要帮助
- scala - 无法写入 S3
- matplotlib - 如何表示对应于极平面中单个量的复数?
- docker - 如何登录到在特定 Kubernetes pod 中运行的 Docker 容器并运行 test.sh 文件?
- mysql - 这是如何在 SQL 中声明变量的正确方法吗?
- java - 没有 Eclipse 找不到 Tomcat 404
- java - Skype For Business - 接收 IM 无法使用以下步骤
- robocopy - 对通配符文件使用 xcopy 而不是 robocopy
- elm - 你如何迭代一个列表(也许是一个)
- batch-file - 等待八个并行批处理脚本