首页 > 解决方案 > 多处理后连接结果

问题描述

我有一个函数通过对 df 进行多处理来创建数据框:-

假设如果我的 df 中有 10 行,那么函数处理器将分别处理所有 10 行。我想要的是连接函数处理器的所有输出并制作一个数据帧。

def processor(dff):

    """
    reading data from a data frame and doing all sorts of data manipulation 
    for multiprocessing
    """

    return df


def main(infile, mdebug):

    global debug
    debug = mdebug

    try:
        lines = sum(1 for line in open(infile))
    except Exception as err:
        print("Error {} opening file: {}").format(err, infile)
        sys.exit(2000)

    if debug >= 2:
        print(infile)

    try:
        dff = pd.read_csv(infile)
    except Exception as err:
        print("Error {}, opening file: {}").format(err, infile)
        sys.exit(2000)

    df_split = np.array_split(dff, (lines+1))

    cores = multiprocessing.cpu_count()
    cores = 64

    # pool = Pool(cores)
    pool = Pool(lines-1)

    for n, frame in enumerate(pool.imap(processor, df_split), start=1):
        if frame is not None:
            frame.to_csv('{}'.format(n))

    pool.close()
    pool.join()

if __name__ == "__main__":
    args = parse_args()
"""
print "Debug is: {}".format(args.debug)
"""
if args.debug >= 1:
    print("Running in debug mode: "), args.debug

main(infile=args.infile, mdebug=args.debug)

标签: pandasmultithreadingdataframemultiprocessing

解决方案


您可以使用数据框构造函数concat解决您的问题。合适的使用取决于您未包含的代码的详细信息

这是一个更完整的示例:

import numpy as np
import pandas as pd

# create dummy dataset
dff = pd.DataFrame(np.random.rand(101, 5), columns=list('abcde'))

# process data
with Pool() as pool:
  result = pool.map(processor, np.array_split(dff, 7))

# put it all back together in one dataframe
result = np.concat(result)

推荐阅读