python - Amazon S3:提取大型二进制文件部分的快速方法?
问题描述
我想在 s3 上读取大型二进制文件的一部分。该文件具有以下格式:
Header 1: 200 bytes
Data 1: 10000 bytes
Header 2: 200 bytes
Data 2: 10000 bytes
...
Header N: 200 bytes
Data N: 10000 bytes
我想提取所有标题并将它们保存到文件中。N 通常是 (1e6->1e8)。
最快的方法是什么?
到目前为止,我已经尝试过 boto3:
def s3_open(bucket, key):
s3 = boto3.resource('s3')
obj = s3.Object(bucket, key)
f = obj.get()['Body']
return f
f = s3_open(bucket, key)
nread = 0
while nread < N:
remaining = N - nread
n = min(1000, remaining)
buf = f.read(n * 10200)
# read 200 bytes from each of these n records and write to file
nread += n
当我在本地 PC 上运行它时,这很慢。f.read() 调用是瓶颈。
解决方案
基于这个答案,您可以通过在多个作业中读取较小(但较大)的文件块来使用 multiprocessing/threading/... 并行读取。
def get_ranges(file_size, chunk_size, n_jobs):
num_entries, remainder = divmod(file_size, chunk_size)
assert not remainder # sanity check for file size
entries_per_process = num_entries // n_jobs
assert entries_per_process >= 1
ranges = [
[
pid * entries_per_process * chunk_size,
(pid + 1) * entries_per_process * chunk_size,
]
for pid in range(n_jobs)
]
# fix up the last chunk in case there's an uneven distribution of jobs and chunks:
ranges[-1][-1] = file_size
return ranges
chunk_size = 200 + 10000
file_size = chunk_size * 15000 # assuming 15 000 chunks
ranges = get_ranges(file_size, chunk_size, 16)
for start, end in ranges:
print(f"spawn something to process bytes {start}-{end}")
打印出类似的东西
spawn something to process bytes 0-9557400
spawn something to process bytes 9557400-19114800
spawn something to process bytes 19114800-28672200
spawn something to process bytes 28672200-38229600
spawn something to process bytes 38229600-47787000
spawn something to process bytes 47787000-57344400
[...]
因此将其与链接的答案和多处理相结合,例如:
import boto3
import multiprocessing
def process_range(range):
# To be on the safe side, let's not share the boto3 resource between
# processes here.
obj = boto3.resource('s3').Object('mybucket', 'mykey')
stream = obj.get(Range='bytes=%d-%d' % (range[0], range[1]))['Body']
stream.read() # read the objects from the stream and do something with them
return 42 # lucky number!
if __name__ == '__main__':
obj = boto3.resource('s3').Object('mybucket', 'mykey')
ranges = get_ranges(obj.content_length, chunk_size, 50)
with multiprocessing.Pool() as p:
# use imap() if you need order!
for result in p.imap_unordered(process_range, ranges):
pass
这自然都是干编码和未经测试的,并且该范围计算可能存在一个错误,所以 YMMV,但我希望这会有所帮助:)
推荐阅读
- angular - 使用 Angular 8 中的 Angular 9 库
- angular - 如何确保 Promise 不会更新 Angular 中过时的页面?
- python - 如何在 kivy 中创建一个能够更改笔触颜色和笔触宽度的圆形输入文本字段?
- sql-server - 从 SQL Server 触发到 Oracle
- java - 使用 Java split 获取 SQL 语句中的表名
- java - 如何有效地将对象 ADto 列表转换为对象 BVo 列表?两个对象具有相同的属性值
- r - 在 R 数据框中编辑日期和时间
- java - 从球衣客户响应中读取个人价值
- node.js - 如何使用 oclif 创建 OIDC 登录?
- java - 如何从一个片段移动到另一个片段