首页 > 解决方案 > DAG 不接受参数化消息

问题描述

我有一个 DAG,需要执行 Python opertaor 并将结果字符串作为消息传递给 PubsubPublish Operator。

我下面的代码完美地打印了消息,但是当我将此 DAG 上传到气流时,它不会加载。我认为这是我的 DAG 的结构,并且 pubsubpublish 运算符无法读取参数“消息”

我尝试将消息用作模板字段,但这也无济于事。

def download_yaml():
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)
blob = bucket.blob(source_blob_name)
content_blob=blob.download_as_string()
encoded_string = base64.b64encode(content_blob)
return encoded_string

encoded_string = download_yaml()
messages = [
     {'data': b64encode(encoded_string)},

] 
print messages
dag= DAG('pubsub-message-docker', default_args=default_args,schedule_interval=timedelta(days=1))
t2 = PubSubPublishOperator(project=project,topic=topic,task_id='publish-messages', messages=messages,dag=dag)

t1= PythonOperator(task_id='download_yaml_as_string',provide_context=True,python_callable=download_yaml,dag=dag)

t1.set_downstream(t2)

我可以打印“编码字符串”,但是我需要在我的 pubsubpublish 运算符中将编码字符串作为消息传递,以便发布。

标签: google-cloud-platformairflowgoogle-cloud-pubsub

解决方案


这里有两点供您考虑。1. dag运营商之间的信息交换,Xcom应该是更官方的方式。

XComs 让任务交换消息,允许更细微的控制形式和共享状态。该名称是“交叉通信”的缩写。......任何可以腌制的对象都可以作为XCom的值,所以用户应该确保使用适当大小的对象。

XCom 可以“推送”(发送)或“拉取”(接收)。......

任务调用 xcom_pull() 来检索 XCom,可选地应用基于键、源 task_ids 和源 dag_id 等标准的过滤器。……

https://airflow.apache.org/concepts.html#xcoms

  1. 您的 python 文件可能会运行并得到未知结果,因为消息与任务 t1 没有关系。它只是在开始时由函数 download_yml 初始化的。虽然 t1 再次调用了 download_yml,但是没有更改消息。因此,T2 仅获取具有初始值的消息。要解决这个问题,您必须将 t1 中的消息推送到 Xcom,并从 Xcom 拉取 t2 中的消息。

祝你好运。

王勇


推荐阅读