首页 > 解决方案 > 使用python同时读取不同的json文件集

问题描述

我有两组文件 b 和 c (JSON)。每个文件的数量通常在 500-1000 之间。现在我正在单独阅读这篇文章。我可以使用多线程同时阅读这些内容吗?我有足够的内存和处理器。

yc=no of c files
yb=no of b files

c_output_transaction_list =[]
for num in range(yc):
    c_json_file='./output/d_c_'+str(num)+'.json'
    print(c_json_file)
    c_transaction_list = json.load(open(c_json_file))['data']['transaction_list']
    c_output_transaction_list.extend(c_transaction_list)
df_res_c= pd.DataFrame(c_output_transaction_list) 


b_output_transaction_list =[]
for num in range(yb):
    b_json_file='./output/d_b_'+str(num)+'.json'
    print(b_json_file)
    b_transaction_list = json.load(open(b_json_file))['data']['transaction_list']
    b_output_transaction_list.extend(b_transaction_list)
df_res_b= pd.DataFrame(b_output_transaction_list) 

标签: pythonjsonpython-3.xpandasmultithreading

解决方案


我使用这种方法将数百个文件并行读取到最终数据帧中。在没有您的数据的情况下,您必须验证这是否符合您的要求。阅读多进程帮助文档会有所帮助。我在 linux 上使用相同的代码(aws ec2 读取 s3 文件)和 windows 读取相同的 s3 文件。我发现这样做可以节省大量时间。

import os
import pandas as pd
from multiprocessing import Pool
# you set the number of processors or just take the cpu_count from the os object. playing around with this does make a difference. For me using the max isn't always the fast overall time
num_proc = os.cpu_count()

# define the funciton that creates a dataframe from your file
# note, this is different where you build the list the create a dataframe at the end
def json_parse(c_json_file):
    c_transaction_list = json.load(open(c_json_file))['data']['transaction_list']
    return pd.DataFrame(c_transaction_list)

# this is multiprocessing function that feeds the file names to the parsing function
# if you don't pass num_proc it defaults to 4
def json_multiprocess(fn_list, num_proc=4):
    with Pool(num_proc) as pool:
        # I use starmap, you may just be able use map
        # if you pass more than the file name, starmap handles zip() very well
        r = pool.starmap(json_parse, fn_list, 15)
        pool.close()
        pool.join()
    return r

# build your file list first
yc=no of c files
flist = []
for num in range(yc):
    c_json_file='./output/d_c_'+str(num)+'.json'
    flist.append(c_json_file)

# get a list of of your intermediate dataframes
dfs = json_multiprocess(flist, num_proc=num_proc)
# concat your dataframe
df_res_c = pd.concat(dfs)

然后对您的下一组文件执行相同操作...使用 Aelarion 评论中的示例来帮助构建文件


推荐阅读