bash - 如何xcom_push在BashOperator Airflow中解压.gz文件后获得的文件?
问题描述
我正在使用 BashOperator 解压缩 Airflow 中的 .gz 文件。
gzip -d 存档名称.csv.gz
所以gzip命令用解压后的archive_name.csv文件替换原来的.gz文件
我在气流中的任务
gzip_file = BashOperator(
task_id = "gzip_file",
bash_command = "gzip -d archive_name.csv.gz",
dag=dag
)
现在我需要知道 Airflow 中其他任务中的文件名,所以我希望任务 gzip_file 应该使用 xcom 推送文件名,以便我的其他任务可以提取文件名并使用它。我怎样才能做到这一点?
解决方案
假设您正在运行最新版本的 Ariflow,您可以将do_xcom_push
[1] 设置为 true 并将解压缩的文件作为写入标准输出的最后一个命令回显,其余的应该由气流完成
如果 BaseOperator.do_xcom_push 为 True,则在 bash 命令完成时,写入 stdout 的最后一行也将被推送到 XCom
然后下游任务可以使用 xcom pull 来检索该文件名 [2]
[1] https://github.com/apache/airflow/blob/45244e38d386f20838a2cc85fbc72edca843a5e1/airflow/operators/bash_operator.py#L34
[2] https://github.com/apache/airflow/blob/master/airflow/example_dags /example_xcom.py
推荐阅读
- python - 我的页码显示有错误,我知道原因和位置但我不知道如何解决它们(PyQt)
- powershell - 如何使用 Powershell 为组中的用户分配自定义应用设置策略?
- flutter - Flutter Web:底部溢出问题
- node.js - 如何在不更新节点的情况下克服节点生命周期结束错误?
- r - geom_text 的包装函数
- python - 将 pandas.tseries.offsets.Day 数据类型转换为整数数据类型以进行简单计算
- sql - 替换列中的多个值
- github - 我在没有拉取请求的情况下丢失了我在 github 上的更改
- python - 将 EBNF 语法转换为 pyparsing 会出错
- python - 有没有办法强制 ruamel 在换行符之前的 OrderedDict 中插入一个新的(键:值)对?