首页 > 解决方案 > 如何xcom_push在BashOperator Airflow中解压.gz文件后获得的文件?

问题描述

我正在使用 BashOperator 解压缩 Airflow 中的 .gz 文件。

gzip -d 存档名称.csv.gz

所以gzip命令用解压后的archive_name.csv文件替换原来的.gz文件

我在气流中的任务

gzip_file = BashOperator(
    task_id = "gzip_file",
    bash_command = "gzip -d archive_name.csv.gz",
    dag=dag
)

现在我需要知道 Airflow 中其他任务中的文件名,所以我希望任务 gzip_file 应该使用 xcom 推送文件名,以便我的其他任务可以提取文件名并使用它。我怎样才能做到这一点?

标签: bashgzipairflow

解决方案


假设您正在运行最新版本的 Ariflow,您可以将do_xcom_push[1] 设置为 true 并将解压缩的文件作为写入标准输出的最后一个命令回显,其余的应该由气流完成

如果 BaseOperator.do_xcom_push 为 True,则在 bash 命令完成时,写入 stdout 的最后一行也将被推送到 XCom

然后下游任务可以使用 xcom pull 来检索该文件名 [2]

[1] https://github.com/apache/airflow/blob/45244e38d386f20838a2cc85fbc72edca843a5e1/airflow/operators/bash_operator.py#L34
[2] https://github.com/apache/airflow/blob/master/airflow/example_dags /example_xcom.py


推荐阅读