python - 删除标头和尾标后如何将文件从一个位置复制到 S3 中的另一个位置
问题描述
我在 S3 文件夹中有一个大约 2 GB 的文件,其中包含不同长度的标题和尾部,实际数据的长度不同。删除标题和预告片后,我需要以编程方式将此文件复制到 S3 中的另一个位置。谁能帮我这个 ?
文件格式(比如文件名 abc.txt)=>
001|20210930|abc.txt
12345|abcsd|prsdf|20210930|10.0|50
12346|sdfgsd|dfg|20210930|20.0|100
12347|dfgfrg|dfg|20210930|30.0|200
009|3
我尝试在 pandas 中从 S3 加载文件,但由于内存错误而失败。所以不能在这里使用熊猫。
我尝试使用 boto3 库并使用
obj.get()['Body'].read()
,但如何从该数据中删除标题和尾部,然后写回 S3 中的文件?
有没有其他有效的方法?
解决方案
我假设你有一些函数is_header(line)
可以is_trailer(line)
分别告诉你该行是标题还是尾随。然后,这是您可以从 S3 流式传输文件并将其保存回来的方法。
import boto3
s3 = boto3.client("s3")
bucket = "mybucket"
key = "path/to/abc.txt"
new_key = "path/to/def.txt"
r = s3.get_object(Bucket=bucket, Key=key)
sb = r["StreamingBody"]
content = [line for line in sb.iter_lines() if not is_header(line) and not is_trailer(line)]
content = b"".join(content)
r = s3.put_object(Bucket=bucket, Key=new_key, Bytes=content)
流数据以避免内存不足错误
上面的代码假设整个文件可以放入内存,我假设它可以,因为它只有 2 GB。如果没有,您将需要使用Multipart Uploads。
from typing import Optional
import boto3
from boto3.s3.transfer import TransferConfig
import botocore
MB = 1024*1024
class FileNoHeader:
"""Wrapper for a botocore StreamingBody to filter headers/trailers"""
def __init__(self, stream: botocore.response.StreamingBody):
self.stream = stream
self.first_line = True
self.line_generator = self.stream.iter_lines()
def read(self, size: Optional[int] = None) -> bytes:
"""Wrap StreamingBody.iter_lines to read line-by-line while making it look like a fileobj
Parameters
----------
size: int, optional
How much data to read. This is a minimum amount because we are using
StreamingBody.iter_lines to read the file line by line, we can only return
whole lines. If `None`, the default, read the entire file.
This parameter is for compatibilty with the read() method of a file-like object
"""
data = []
amt = 0
line = b""
while size is None or amt < size:
try:
line = next(self.line_generator)
except StopIteration:
if line:
amt += len(line)
data.append(line)
break
if self.is_header(line) or self.is_trailer(line):
line = b""
continue
amt += len(line)
data.append(line)
return b"\n".join(data)
def close(self):
"""Close the underlying StreamingBody"""
self.stream.close()
def is_header(self, line):
# TODO: implement your logic
# right now just skips the first line
if self.first_line:
self.first_line = False
return True
return self.first_line
def is_trailer(self, line):
# TODO: implement your logic
return False
## Usage
config = TransferConfig(multipart_chunksize=1*MB)
s3 = boto3.client("s3")
bucket = "mybucket"
key = "path/to/abc.txt"
new_key = "path/to/abc_no_header.txt"
r = s3.get_object(Bucket=bucket, Key=key)
streaming_body = r["Body"]
data_stream = FileNoHeader(streaming_body)
def tcback(bytes_transferred):
print(f"{bytes_transferred} bytes transferred")
s3.upload_fileobj(
data_stream,
bucket,
new_key,
Config=config,
Callback=tcback
)
侧边栏:AWS Lambda
如果您使用 AWS Lambda 函数,则最多可以拥有 10 GB 的内存。您可以在 AWS 控制台或使用API设置内存。这是boto3和AWS CLI v2的文档。
推荐阅读
- html - 如何在 Bootstrap 4 中使每一列的高度相等?
- ruby-on-rails - mysql2 gem 未加载安装 redmine-agile
- java - Java Prepared Statement 无法更新数据库
- c# - 使用 lambda 表达式将反射属性传递给 .HasKey()
- python - 如何修改 views.py 和 urls.py 以启用 Django 应用程序的端点?
- xpath-2.0 - XPath 2.0 公式中条件的评估顺序
- c# - 将模型的属性显示为标签和文本框的最简单方法是什么?
- debugging - 从 Visual Studio 2017 运行控制台程序时,如何为每次运行创建新的控制台窗口?
- java - 这个排序算法是如何工作的?
- rjava - 无法在 RedHat 上安装 RJDBC 包