首页 > 解决方案 > 在 Pandas Dataframe 中并行加载输入文件

问题描述

我有一个要求,其中我有三个输入文件,需要将它们加载到 Pandas 数据框中,然后将其中两个文件合并到一个数据框中。

文件扩展名总是改变,它可能是 .txt 一次, .xlsx 或 .csv 另一次。

我怎样才能并行运行这个过程,以节省等待/加载时间?

这是我目前的代码,

from time import time # to measure the time taken to run the code
start_time = time()

Primary_File = "//ServerA/Testing Folder File Open/Report.xlsx"
Secondary_File_1 = "//ServerA/Testing Folder File Open/Report2.csv"
Secondary_File_2 = "//ServerA/Testing Folder File Open/Report2.csv"

import pandas as pd # to work with the data frames
Primary_df = pd.read_excel (Primary_File)
Secondary_1_df = pd.read_csv (Secondary_File_1)
Secondary_2_df = pd.read_csv (Secondary_File_2)

Secondary_df = Secondary_1_df.merge(Secondary_2_df, how='inner', on=['ID'])
end_time = time()

print(end_time - start_time)

加载我的 primary_df 和 secondary_df 大约需要 20 分钟。因此,我正在寻找一种可能使用并行处理来节省时间的有效解决方案。我通过阅读操作计时,大部分时间大约需要 18 分 45 秒。

硬件配置:- 英特尔 i5 处理器、16 GB 内存和 64 位操作系统

有资格获得赏金的问题:-当我正在寻找具有详细步骤的工作代码时-在 anaconda 环境中使用一个包,该包支持并行加载我的输入文件并将它们分别存储在 pandas 数据框中。这最终应该可以节省时间。

标签: pythonpandasanaconda

解决方案


试试这个:

from time import time 
import pandas as pd
from multiprocessing.pool import ThreadPool


start_time = time()

pool = ThreadPool(processes=3)

Primary_File = "//ServerA/Testing Folder File Open/Report.xlsx"
Secondary_File_1 = "//ServerA/Testing Folder File Open/Report2.csv"
Secondary_File_2 = "//ServerA/Testing Folder File Open/Report2.csv"


# Define a function for the thread
def import_xlsx(file_name):
    df_xlsx = pd.read_excel(file_name)
    # print(df_xlsx.head())
    return df_xlsx


def import_csv(file_name):
    df_csv = pd.read_csv(file_name)
    # print(df_csv.head())
    return df_csv

# Create two threads as follows

Primary_df = pool.apply_async(import_xlsx, (Primary_File, )).get() 
Secondary_1_df = pool.apply_async(import_csv, (Secondary_File_1, )).get() 
Secondary_2_df = pool.apply_async(import_csv, (Secondary_File_2, )).get() 

Secondary_df = Secondary_1_df.merge(Secondary_2_df, how='inner', on=['ID'])
end_time = time()

推荐阅读