python - 尝试使用 Python 和 smart_open 即时解压大文件时出现“EOFError:已到达流结束”错误
问题描述
我正在尝试从远程 Apache 服务器下载并解压缩一组文件。我提供了一个 .tbz (tar.bz2) 文件列表,供您即时下载和解压缩。目标是通过 tar 解压缩器从远程 Apache 服务器流式传输它们,并立即将它们流式传输到我的 Amazon AWS S3 存储桶。我这样做是因为文件可以大到 30Gb。
我使用“smart_open”python 库来抽象出 https 和 s3 管理。
我在此处提供的代码适用于小文件。一旦我尝试使用更大的文件(超过 8Mb)执行此操作,就会收到以下错误:
"EOFError: End of stream already reached"
这是回溯:
Traceback (most recent call last):
File "./script.py", line 28, in <module>
download_file(fileName)
File "./script.py", line 21, in download_file
for line in tfext:
File "/.../lib/python3.7/tarfile.py", line 706, in readinto
buf = self.read(len(b))
File "/.../lib/python3.7/tarfile.py", line 695, in read
b = self.fileobj.read(length)
File "/.../lib/python3.7/tarfile.py", line 537, in read
buf = self._read(size)
File "/.../lib/python3.7/tarfile.py", line 554, in _read
buf = self.cmp.decompress(buf)
EOFError: End of stream already reached
当我打印出要写入流的行时,我可以看到在引发错误之前我仍在处理文件的第一部分。
到目前为止我已经尝试过:
我试图为 open() 和 tarfile.open() 指定相同的缓冲区大小,但没有成功。
我还尝试在每行的写入之间引入一些延迟,但也无济于事。
from smart_open import open
import tarfile
baseUrl = 'https://someurlpath/'
filesToDownload = ['name_of_file_to_download']
def download_file(fileName):
fileUrl = baseUrl + fileName + '.tbz'
with open(fileUrl, 'rb') as fin:
with tarfile.open(fileobj=fin, mode='r|bz2') as tf:
destination = 's3://some_aws_path/' + fileName + '.csv'
with open(destination, 'wb') as fout:
with tf.extractfile(tf.next()) as tfext:
for line in tfext:
fout.write(line)
for fileName in filesToDownload:
download_file(fileName)
我希望能够像处理小文件一样处理大文件。
解决方案
压缩的 tar 提取需要文件搜索,这可能无法使用smart_open 创建的虚拟文件描述符进行。另一种方法是在处理之前将数据下载到块存储中。
from smart_open import open
import tarfile
import boto3
from codecs import open as copen
filenames = ['test.tar.bz2',]
def download_file(fileName):
s3 = boto3.resource('s3')
bucket = s3.Bucket('bucketname')
obj = bucket.Object(fileName)
local_filename = '/tmp/{}'.format(fileName)
obj.download_file(local_filename)
tf = tarfile.open(local_filename, 'r:bz2')
for member in tf.getmembers():
tf.extract(member)
fd = open(member.name, 'rb')
print(member, len(fd.read()))
if __name__ == '__main__':
for f in filenames:
download_file(f)
推荐阅读
- visual-studio-2008 - CAB 安装程序无法复制 System.dll、mscorlib.dll 和 mscorlib.tlb
- objective-c - 编码为 Mpeg-TS 后 Replay Kit 中的 CMSampleBuffer 视频正常,音频已损坏
- python - 在 Salesforce 中使用默认值创建多个列
- java - 如何检测用户输入的字符串是否不符合我的 ANTLR 语法规则?
- sql - 如何从 Oracle SQL 中的连续列中获取运行总计
- c# - 如何按名称对复选框列表进行排序?
- java - 哪些版本的 Cassandra 与 Astyanax 3.10.x 兼容
- java - 如何将类属性限制为 6 个字符并使其最初为 3 个字符和之后的 3 个数值?
- css - 具有重复不规则图案的纯 flexbox 网格
- github - 需要更改 GitHub 上的默认分支