首页 > 解决方案 > 删除标头和尾标后如何将文件从一个位置复制到 S3 中的另一个位置

问题描述

我在 S3 文件夹中有一个大约 2 GB 的文件,其中包含不同长度的标题和尾部,实际数据的长度不同。删除标题和预告片后,我需要以编程方式将此文件复制到 S3 中的另一个位置。谁能帮我这个 ?

文件格式(比如文件名 abc.txt)=>

001|20210930|abc.txt
12345|abcsd|prsdf|20210930|10.0|50
12346|sdfgsd|dfg|20210930|20.0|100
12347|dfgfrg|dfg|20210930|30.0|200
009|3
  1. 我尝试在 pandas 中从 S3 加载文件,但由于内存错误而失败。所以不能在这里使用熊猫。

  2. 我尝试使用 boto3 库并使用 obj.get()['Body'].read(),但如何从该数据中删除标题和尾部,然后写回 S3 中的文件?

有没有其他有效的方法?

标签: pythonamazon-s3boto3amazon-s3-bucket

解决方案


我假设你有一些函数is_header(line)可以is_trailer(line)分别告诉你该行是标题还是尾随。然后,这是您可以从 S3 流式传输文件并将其保存回来的方法。

import boto3

s3 = boto3.client("s3")
bucket = "mybucket"
key = "path/to/abc.txt"
new_key = "path/to/def.txt"

r = s3.get_object(Bucket=bucket, Key=key)
sb = r["StreamingBody"]

content = [line for line in sb.iter_lines() if not is_header(line) and not is_trailer(line)]
content = b"".join(content)
r = s3.put_object(Bucket=bucket, Key=new_key, Bytes=content)

流数据以避免内存不足错误

上面的代码假设整个文件可以放入内存,我假设它可以,因为它只有 2 GB。如果没有,您将需要使用Multipart Uploads

这是使用TransferManager的一种方法

from typing import Optional
import boto3
from boto3.s3.transfer import TransferConfig
import botocore

MB = 1024*1024

class FileNoHeader:
    """Wrapper for a botocore StreamingBody to filter headers/trailers"""
    
    def __init__(self, stream: botocore.response.StreamingBody):
        self.stream = stream
        self.first_line = True
        self.line_generator = self.stream.iter_lines()

    def read(self, size: Optional[int] = None) -> bytes:
        """Wrap StreamingBody.iter_lines to read line-by-line while making it look like a fileobj
        
        Parameters
        ----------
        size: int, optional
            How much data to read. This is a minimum amount because we are using
            StreamingBody.iter_lines to read the file line by line, we can only return
            whole lines. If `None`, the default, read the entire file.
            This parameter is for compatibilty with the read() method of a file-like object
        """
        data = []
        amt = 0
        line = b""
        while size is None or amt < size:
            try:
                line = next(self.line_generator)
            except StopIteration:
                if line:
                    amt += len(line)
                    data.append(line)
                break
            if self.is_header(line) or self.is_trailer(line):
                line = b""
                continue
            amt += len(line)
            data.append(line)
        return b"\n".join(data)

    def close(self):
        """Close the underlying StreamingBody"""
        self.stream.close()
        
    def is_header(self, line):
        # TODO: implement your logic
        # right now just skips the first line
        if self.first_line:
            self.first_line = False
            return True
        return self.first_line

    def is_trailer(self, line):
        # TODO: implement your logic
        return False


## Usage
config = TransferConfig(multipart_chunksize=1*MB)
s3 = boto3.client("s3")

bucket = "mybucket"
key = "path/to/abc.txt"
new_key = "path/to/abc_no_header.txt"

r = s3.get_object(Bucket=bucket, Key=key)
streaming_body = r["Body"]
data_stream = FileNoHeader(streaming_body)

def tcback(bytes_transferred):
    print(f"{bytes_transferred} bytes transferred")

s3.upload_fileobj(
    data_stream,
    bucket,
    new_key,
    Config=config,
    Callback=tcback
)

侧边栏:AWS Lambda

如果您使用 AWS Lambda 函数,则最多可以拥有 10 GB 的内存。您可以在 AWS 控制台或使用API设置内存。这是boto3AWS CLI v2的文档。


推荐阅读