google-cloud-functions - 如何在 dag 云作曲家中从外部触发任务
问题描述
我想要一个基本上看起来像这样的数据管道
其中多个任务由相应的 pubsub 消息触发,处理来自 pubsub 消息输入的数据,最后一个任务只有在所有这些工作流完成后才会触发。我设法使用 PubSub 触发整个 DAG(按照本指南对 PubSub 进行修改),但它会触发整个 DAG,而不是单个任务。有没有办法只在外部触发 DAG 中的 1 个任务(来自 Cloud Function/PubSub?)
编辑
这是我认为 DAG 代码的简化版本:
import google.cloud.bigquery as bigquery
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators import python_operator
from airflow.operators import dummy_operator
def task1_1(**kwargs):
# I want this function to take the table name of source 1 from pubsub1, reads the table from BigQuery and processes it
client_bq = bigquery.Client()
table_name = kwargs['dag_run'].conf.get('message')
data = client_bq.query(f"SELECT * FROM {table_name}").result().to_dataframe()
# ETL Code
# .....
def task2_1(**kwargs):
# I want this function to take the table name of source 2 from pubsub2, reads the table from BigQuery and processes it
client_bq = bigquery.Client()
table_name = kwargs['dag_run'].conf.get('message')
data = client_bq.query(f"SELECT * FROM {table_name}").result().to_dataframe()
# ETL Code
# .....
def task_combine():
# This task is triggered when task1_1 and task2_1 are done
# More ETL code
with DAG(
'clean_am_workflow',
schedule_interval=None,
start_date=datetime.datetime.today() - datetime.timedelta(days=5),
catchup=False) as dag:
source_1 = python_operator.PythonOperator(
task_id='process_source_1',
python_callable=task1_1,
provide_context=True
)
source_2 = python_operator.PythonOperator(
task_id='process_source_2',
python_callable=task2_1,
provide_context=True
)
combine = python_operator.PythonOperator(
task_id='combine_sources',
python_callable=task_combine,
provide_context=True
)
[source_1, source_2] >> combine
解决方案
您需要的不是触发dag本身,而是根据bigquery分别触发不同的任务。这可以通过气流传感器来实现。 https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/index.html SQL 传感器: https ://airflow.apache.org/docs/apache-airflow/stable/_modules/气流/传感器/sql.html
在这种情况下,dag 将由正常的 cron 触发。2 传感器任务将定期查询 bigquery,如果该查询返回“good to go”,那么它将启动任务。因为 2 传感器是独立的,所以最后一个任务只有在传感器和任务都完成时才会执行。
推荐阅读
- c# - (UNITY) 是否可以根据 Screen.Orientation 更改布局而不复制所有游戏对象?
- javascript - 从同一 LAN 中的其他设备访问时,Node 和 Express 后端不提供图像资源
- java - 如何使用套接字将多个服务器连接到 Java 中的客户端?
- google-sheets - 计算Google表格中一定范围内素数数量的公式
- amazon-dynamodb - DynamoDB - 全局表冲突解决
- c# - RSACng 无法验证哈希
- c - 非阻塞连接不向 kqueue 报告完成
- c++ - 如何将枚举类型向量的向量保存在
- r - 包 'ggbiplot' 不可用(对于 R 版本 3.5.3)?
- react-native - 在调用函数 React-Native 上保存 textInputs 的值