首页 > 解决方案 > 如何在python中流式传输数据而不一次将其全部加载到内存中?

问题描述

我正在尝试写入和读取流,而不是一次将所有内容加载到内存中。这是我想象的工作:

import io

stream = io.BytesIO()

def process_stream(stream):
  while True:
    chunk = stream.read(5).decode('utf-8')
    if not chunk:
      return
    yield chunk

# this would be a separate thread, but here we just do it in serial:
for i in range(3):
  stream.write(b'asdf')

for chunk in process_stream(stream):
  print('I read', chunk)

但这实际上并没有打印出任何东西。我可以让它工作,但只有以下两个更改,其中任何一个都要求所有字节一次保存在内存中:

我很困惑增量写入只能通过批量读取来读取,而增量读取只适用于批量写入。如何获得一个恒定内存(假设process_stream超过写作)的解决方案?

标签: pythonio

解决方案


当您使用 for 循环写入流时。您的搜索最终处于最后一个位置。

asdfasdfasdf|
            ^ (Seek)            

因此,当您尝试阅读时,最后一个字符之后没有任何内容,因此在阅读流时您什么也得不到。一种解决方案是将搜索重新定位到流的开头,以便您可以阅读它。为此我们可以使用stream.seek(0)

|asdfasdfasdf
^ (Seek after calling stream.seek(0))            

代码:

import io

stream = io.BytesIO()


def process_stream(stream, chunk_size=5):
    while True:
        chunk = stream.read(chunk_size).decode('utf-8')
        if not chunk:
            return
        yield chunk


# this would be a separate thread, but here we just do it in serial:
for i in range(3):
    stream.write(b'asdf')

stream.seek(0) # Reset the seek so it is at the beginning
for chunk in process_stream(stream):
    print('I read', chunk)

输出:

I read asdfa
I read sdfas
I read df

更多信息:Python io.BytesIO 的 write()、read() 和 getvalue() 方法如何工作?


推荐阅读