bash - composer 中的 bash 操作符,用于将最近的文件从一个 GCS 存储桶复制到另一个
问题描述
我编写了以下运行 BashOperator 的 DAG 脚本(python 代码)。这很奇怪,因为当我在终端中启动 bash 命令时,bash 命令运行顺畅。但是当我将它包装到 Airflow DAG 中的 BashOperatgor 中时,同样的命令。
此代码的目的是将(今天的)最后一个文件从一个 GCS 存储桶复制到另一个存储桶。下面是代码:
从气流.operators.bash_operator 导入 BashOperator 从气流导入 DAG 从日期时间导入日期时间
DEFAULT_DAG_ARGS = {
'owner': 'name',
'depends_on_past': False,
'start_date': datetime.now(),
'retries': 0,
'schedule_interval': None
}
with DAG('copy_input_files', default_args=DEFAULT_DAG_ARGS) as dag:
pre_dag_cp = BashOperator(
task_id='copy_current_files',
bash_command="gsutil -m ls -l gs://input/files/UES | grep $(date -I) | sed 's/.*\(gs:\/\/\)/\1/'| gsutil cp -I gs://output/recent_files " + "\nexit 0"
)
我收到以下错误: CommandException: No URLs matched: input/files/UES/TV11_INFODEB.2019_01_02_02.orc
并且没有按预期复制文件,当我在基本终端中测试 dag 之外的 bash 命令时,这是可行的,请知道如何解决这个问题
解决方案
请查看此任务的专用操作员。
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
copy_single_file = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id='copy_single_file',
source_bucket='data',
source_object='sales/sales-2017/january.avro',
destination_bucket='data_backup',
destination_object='copied_sales/2017/january-backup.avro',
google_cloud_storage_conn_id=google_cloud_conn_id
)
参考:https ://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/gcs_to_gcs.py
推荐阅读
- javascript - 延迟“每个”方法
- python - Error in "from . import views" in urls.py while making migrations
- spring-batch - 如何在春季批处理中的另一个流中定义并行子流?
- imagemap - How to create image-map in jssor slider
- sd-card - Orange Pi 4g iot configuration and flash tool not working
- ubuntu - How to redirect non-www https port XXX to www https port XXX using nginx?
- c# - 在 DockPanel 中拆分项目
- react-native - 在 TextInput 中实现 @mention
- php - CODEIGNITER 自定义重定向,url 中只有域名
- assembly - 为什么汇编语言不需要编译?