python - Python 请求:将 iter_content 块流式传输到 pandas read_csv 函数
问题描述
我正在尝试将一个巨大的 csv.gz 文件从 url 读取成块并将其即时写入数据库。我必须在内存中完成所有这些,磁盘上不能存在任何数据。
我有下面的生成器函数,它将响应块生成到 Dataframe 对象中。
它使用请求的 response.raw 作为 pd.read_csv 函数的输入来工作,但它看起来不可靠,有时会引发超时错误:urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(10054, \'WSAECONNRESET\')",)', OSError("(10054, 'WSAECONNRESET')",))
response = session.get(target, stream=True)
df_it = pd.read_csv(response.raw, compression='gzip', chunksize=10**6,
header=None, dtype=str, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
if df.empty:
continue
if (i % 10) == 0:
time.sleep(10)
yield df
我决定改用 iter_content,因为我读到它应该更可靠。我已经实现了以下功能,但我收到了这个错误:EOFError: Compressed file ended before the end-of-stream marker was reached
。
我认为这与我传入压缩的 Bytes 对象(?)的事实有关,但我不确定如何将 pandas.read_csv 传递给它将接受的对象。
response = session.get(target, stream=True)
for chunk in response.iter_content(chunk_size=10**6):
file_obj = io.BytesIO()
file_obj.write(chunk)
file_obj.seek(0)
df_it = pd.read_csv(file_obj, compression='gzip', dtype=str,
header=None, names=columns, parse_dates=['datetime'])
for i, df in enumerate(self.process_df(df_it)):
if df.empty:
continue
if (i % 10) == 0:
time.sleep(10)
yield df
任何想法都非常感谢!
谢谢
解决方案
你不妨试试这个:
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
"""
Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
input stream.
The stream implements Python 3's newer I/O API (available in Python 2's io module).
For efficiency, the stream is buffered.
"""
class IterStream(io.RawIOBase):
def __init__(self):
self.leftover = None
def readable(self):
return True
def readinto(self, b):
try:
l = len(b) # We're supposed to return at most this much
chunk = self.leftover or next(iterable)
output, self.leftover = chunk[:l], chunk[l:]
b[:len(output)] = output
return len(output)
except StopIteration:
return 0 # indicate EOF
return io.BufferedReader(IterStream(), buffer_size=buffer_size)
然后
response = session.get(target, stream=True)
response.raw.decode_content = decode
df = pd.read_csv(iterable_to_stream(response.iter_content()), sep=';')
我用它来流式传输 .csv 文件odsclient
。它似乎有效,虽然我没有尝试使用 gz 压缩。
推荐阅读
- python - Google Cloud Vertex AI - 模型不支持 400 'dedicated_resources'
- javascript - vuejs 在一个元素上使用多个 v-if
- javascript - Javascript中函数变量的语法错误
- java - Quartz 作业调度程序不会在 Docker 上停止
- kubernetes - StatefulSet 的 Pod 名称解析不起作用
- javascript - JavaScript:如何在滚动期间更好地指示(突出显示)当前导航页面,而不是仅使用“scrollY > x”
- java - 为什么 Hibernate 插入我的实体而不是更新它?
- ssl - 带有 SSL 的 Socket.io 在 OSx 环境中显示错误
- html - 最好在哪里指定表格的宽度 - 在 HTML 或 CSS 类中?
- python - X 轴标签未显示在条形图上