首页 > 解决方案 > 使用 s3hook 仅从 s3 获取文件名

问题描述

我正在创建以下基于 s3CopyObjectOperator 的类,但我必须从 s3 目录复制所有文件并保存到另一个目录,然后删除这些文件。

但我需要我从中复制的目录中的文件名。所以可以说复制源是:

my_bucket/folder1/folder2/my_unique_filename.csv

我需要新的密钥是:

my_bucket/new_folder1/my_unique_filename.csv

所以我想我的问题是我怎样才能从目录中只获取文件名,以便我可以将它用于我的新目录?如果前缀不包括目录,那也可以。

def execute(self, context):

    s3 = S3Hook(self.aws_conn_id)
    s3_conn = s3.get_conn()

    keys = s3.list_keys(bucket_name=self.partition.bucket, prefix=self.partition.key_prefix)


    # copy the files from the bucket
    for key in keys:
        logging.info(f'key :{key}')

        s3_conn.copy_object(Bucket=self.partition.bucket, Key=f'{self.new_key}',
                            CopySource={
                                'Bucket': self.partition.bucket,
                                'Key': key
                                }, 
                                ContentEncoding='csv')

        logging.info(f"self.new_key {self.new_key}")
        s3.delete_objects(self.partition.bucket, key)

标签: python-3.xamazon-s3airflow

解决方案


S3 是一个对象存储,“路径”实际上是名称的一部分。您可以将其视为基本文件名的前缀。

假设您有要附加到文件名的目标前缀,您可以为找到的每个 s3 密钥构建目标密钥。

def execute(self, context):

    s3 = S3Hook(self.aws_conn_id)
    s3_conn = s3.get_conn()

    keys = s3.list_keys(bucket_name=self.partition.bucket, prefix=self.partition.key_prefix)

    for key in keys:
        # prefix should be equal to self.partition.key_prefix 
        prefix, filename = os.path.split(key)
        
        dest_key = f'{self.dest_prefix}/{filename}'

        logging.info(f'Copying: {key} to {dest_key}')
        s3_conn.copy_object(
            Bucket=self.partition.bucket,
            Key=f'{dest_key}',
            CopySource={
                'Bucket': self.partition.bucket,
                'Key': key
            },
            ContentEncoding='csv'
        )

        logging.info(f'Deleting: {key}')
        s3.delete_objects(self.partition.bucket, key)

推荐阅读