首页 > 解决方案 > composer 中的 bash 操作符,用于将最近的文件从一个 GCS 存储桶复制到另一个

问题描述

我编写了以下运行 BashOperator 的 DAG 脚本(python 代码)。这很奇怪,因为当我在终端中启动 bash 命令时,bash 命令运行顺畅。但是当我将它包装到 Airflow DAG 中的 BashOperatgor 中时,同样的命令。

此代码的目的是将(今天的)最后一个文件从一个 GCS 存储桶复制到另一个存储桶。下面是代码:

从气流.operators.bash_operator 导入 BashOperator 从气流导入 DAG 从日期时间导入日期时间

DEFAULT_DAG_ARGS = {
    'owner': 'name',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'retries': 0,
    'schedule_interval': None
}

with DAG('copy_input_files', default_args=DEFAULT_DAG_ARGS) as dag:
    pre_dag_cp = BashOperator(
        task_id='copy_current_files',
        bash_command="gsutil -m ls -l gs://input/files/UES | grep $(date -I) | sed 's/.*\(gs:\/\/\)/\1/'| gsutil cp -I  gs://output/recent_files "  + "\nexit 0"
    )

我收到以下错误: CommandException: No URLs matched: input/files/UES/TV11_INFODEB.2019_01_02_02.orc并且没有按预期复制文件,当我在基本终端中测试 dag 之外的 bash 命令时,这是可行的,请知道如何解决这个问题

标签: bashgoogle-cloud-platformgoogle-cloud-storageairflow

解决方案


请查看此任务的专用操作员。

from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator

            copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator(
                task_id='copy_single_file',
                source_bucket='data',
                source_object='sales/sales-2017/january.avro',
                destination_bucket='data_backup',
                destination_object='copied_sales/2017/january-backup.avro',
                google_cloud_storage_conn_id=google_cloud_conn_id
            )

参考:https ://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/gcs_to_gcs.py


推荐阅读