python - 从大型压缩文件流式传输 JSON 对象
问题描述
我正在从事一个个人项目,该项目涉及读取 JSON 对象的大文件,这些文件可能包含数百万个条目,这些条目是使用 GZip 压缩的。我遇到的问题是确定如何有效地逐行解析这些对象并将它们存储在内存中,这样它们就不会耗尽我系统上的所有 RAM。它必须能够在以后访问或构造这些对象以进行分析。到目前为止我所尝试的如下
def parse_data(file):
accounts = []
with gzip.open(file, mode='rb') as accounts_data:
for line in accounts_data:
# if line is not empty
if len(line,strip()) != 0:
account = BytesIO(line)
accounts.append(account)
return accounts
def getaccounts(accounts, idx):
account = json.load(accounts[idx])
# creates account object using fields in account dict
return account_from_dict(account)
这个实现的一个主要问题是我无法访问帐户中的同一个对象两次,否则会导致生成 JSONDecodeError。我也不确定这是否是我可以做到的最紧凑的方式。
任何帮助将不胜感激。
编辑:这些文件中存储的数据格式如下:
{JSON Object 1}
{JSON Object 2}
...
{JSON Object n}
编辑:我打算使用存储在这些 JSON 帐户条目中的信息来形成帐户信息中的相似性或模式图。
解决方案
以下是如何随机访问 gzip 压缩文件中的 JSON 对象,方法是首先将其解压缩到一个临时文件中,然后使用tell()
并按seek()
索引检索它们——因此只需要足够的内存来保存每个对象的偏移量。
我发布这个主要是因为你在评论中问我一个这样做的例子......否则我不会有,因为它与流数据不太一样。主要区别在于,与这样做不同的是,它可以访问所有数据,包括能够随意随机访问任何对象。
首先解压缩整个文件确实会引入一些额外的开销,因此除非您需要能够多次访问 JSON 对象,否则可能不值得。显示的实现可能会通过缓存以前加载的对象来加速,但在不确切知道访问模式将是什么的情况下,很难确定。
import collections.abc
import gzip
import json
import random
import tempfile
class GZ_JSON_Array(collections.abc.Sequence):
""" Allows objects in gzipped file of JSON objects, one-per-line, to be
treated as an immutable sequence of JSON objects.
"""
def __init__(self, gzip_filename):
self.tmpfile = tempfile.TemporaryFile('w+b')
# Decompress a gzip file into a temp file and save offsets of the
# start of each line in it.
self.offsets = []
with gzip.open(gzip_filename, mode='rb') as gzip_file:
for line in gzip_file:
line = line.rstrip().decode('utf-8')
if line:
self.offsets.append(self.tmpfile.tell())
self.tmpfile.write(bytes(line + '\n', encoding='utf-8'))
def __len__(self):
return len(self.offsets)
def __iter__(self):
for index in range(len(self)):
yield self[index]
def __getitem__(self, index):
""" Return a JSON object at offsets[index] in the given open file. """
if index not in range(len(self.offsets)):
raise IndexError
self.tmpfile.seek(self.offsets[index])
try:
size = self.offsets[index+1] - self.offsets[index] # Difference with next.
except IndexError:
size = -1 # Last one - read all remaining data.
return json.loads(self.tmpfile.read(size).decode())
def __del__(self):
try:
self.tmpfile.close() # Allow it to auto-delete.
except Exception:
pass
if __name__ == '__main__':
gzip_filename = 'json_objects.dat.gz'
json_array = GZ_JSON_Array(gzip_filename)
# Randomly access some objects in the JSON array.
for index in random.sample(range(len(json_array)), 3):
obj = json_array[index]
print('object[{}]: {!r}'.format(index, obj))
推荐阅读
- ruby-on-rails - Rails 5:验证表单的方法
- swift - 在 SwiftUI 中向 NavigationView 添加搜索栏
- html - 无法为具有第 n 个类型的 hr 元素选择范围
- c - 致命错误:json-c/json.h:没有这样的文件或目录在 ubuntu x64 上为 raspberry pi arm 交叉编译
- javascript - 为什么使用 Nuxt 和 Vuex 找不到我的 getter 和操作
- c - 如何在 C 中将元素添加到数组中?
- php - 如何将名字和姓氏作为键=>值添加到数组中并显示它们?
- javascript - 如何使用 Google 服务帐户和 Google API 来验证 G-Suite 用户的密码是否符合其用户 ID
- php - 根据用户输入 SQL 防止值为负数
- wordpress - 插件激活时,Wordpress 在表格中创建列