首页 > 解决方案 > 尝试使用 Python 和 smart_open 即时解压大文件时出现“EOFError:已到达流结束”错误

问题描述

我正在尝试从远程 Apache 服务器下载并解压缩一组文件。我提供了一个 .tbz (tar.bz2) 文件列表,供您即时下载和解压缩。目标是通过 tar 解压缩器从远程 Apache 服务器流式传输它们,并立即将它们流式传输到我的 Amazon AWS S3 存储桶。我这样做是因为文件可以大到 30Gb。

我使用“smart_open”python 库来抽象出 https 和 s3 管理。

我在此处提供的代码适用于小文件。一旦我尝试使用更大的文件(超过 8Mb)执行此操作,就会收到以下错误:

"EOFError: End of stream already reached"

这是回溯:

Traceback (most recent call last):
  File "./script.py", line 28, in <module>
    download_file(fileName)
  File "./script.py", line 21, in download_file
    for line in tfext:
  File "/.../lib/python3.7/tarfile.py", line 706, in readinto
    buf = self.read(len(b))
  File "/.../lib/python3.7/tarfile.py", line 695, in read
    b = self.fileobj.read(length)
  File "/.../lib/python3.7/tarfile.py", line 537, in read
    buf = self._read(size)
  File "/.../lib/python3.7/tarfile.py", line 554, in _read
    buf = self.cmp.decompress(buf)
EOFError: End of stream already reached

当我打印出要写入流的行时,我可以看到在引发错误之前我仍在处理文件的第一部分。

到目前为止我已经尝试过:

  1. 我试图为 open() 和 tarfile.open() 指定相同的缓冲区大小,但没有成功。

  2. 我还尝试在每行的写入之间引入一些延迟,但也无济于事。

from smart_open import open
import tarfile

baseUrl = 'https://someurlpath/'
filesToDownload = ['name_of_file_to_download']

def download_file(fileName):
    fileUrl = baseUrl + fileName + '.tbz'
    with open(fileUrl, 'rb') as fin:
        with tarfile.open(fileobj=fin, mode='r|bz2') as tf:
            destination = 's3://some_aws_path/' + fileName + '.csv'
            with open(destination, 'wb') as fout:
                with tf.extractfile(tf.next()) as tfext:
                    for line in tfext:
                        fout.write(line)


for fileName in filesToDownload:
    download_file(fileName)

我希望能够像处理小文件一样处理大文件。

标签: pythonpython-3.xamazon-s3tartarfile

解决方案


压缩的 tar 提取需要文件搜索,这可能无法使用smart_open 创建的虚拟文件描述符进行。另一种方法是在处理之前将数据下载到块存储中。

from smart_open import open
import tarfile
import boto3
from codecs import open as copen

filenames = ['test.tar.bz2',]

def download_file(fileName):
   s3 = boto3.resource('s3')
   bucket = s3.Bucket('bucketname')
   obj = bucket.Object(fileName)
   local_filename = '/tmp/{}'.format(fileName)
   obj.download_file(local_filename)
   tf = tarfile.open(local_filename, 'r:bz2')
   for member in tf.getmembers():
      tf.extract(member)
      fd = open(member.name, 'rb')
      print(member, len(fd.read()))
if __name__ == '__main__':
   for f in filenames:
      download_file(f)

推荐阅读