首页 > 解决方案 > 了解如何使用多处理池

问题描述

我希望有人能够帮助我解决一些关于多处理的问题,以及我是否使用正确的方法来解决我正在尝试做的事情。

这是我已经编写的一些示例代码,我认为它显示了我正在尝试完成的工作:

import multiprocessing
import time


# Example Class that does some math things
class DoMath(object):
    def __init__(self, total):
        self.total = total

    def add(self, a, b):
        print("Addition: ", int(a+b))
        return int(a+b)

    def sub(self, a, b):
        print("Subtraction: ", int(a - b))
        return int(a-b)

    def totals(self, add, minus):
        self.total += (add + minus)
        print("Current Total: ", self.total)
        return int(self.total)


# Pool worker doing simple tasks, sleep is there so it doesn't complete instantly
def worker(a, b):
    time.sleep(1)
    add = values.add(a, b)
    sub = values.sub(a, b)
    values.totals(add, sub)
    return values


# Call back function
def callback(x):
    print('worker done')
    print(x.total)
    return x


if __name__ == '__main__':
    math_numbers = {"1": {"a": 1, "b": 1}, "2": {"a": 2, "b": 2}}

    values = DoMath(0)

    pool_size = 1
    pool = multiprocessing.Pool(
        processes=pool_size
    )

    for key, params_dict in math_numbers.items():
        test = pool.apply_async(worker, args=tuple(), kwds=params_dict, callback=callback)

    pool.close()
    pool.join()

    print(test)
    print(values.total)

当我运行它时,这些是我得到的结果:

Addition:  2
Subtraction:  0
Current Total:  2
worker done
2
Addition:  4
Subtraction:  0
Current Total:  6
worker done
6
<multiprocessing.pool.ApplyResult object at 0x104c5a0b8>
0

我的第一个问题围绕着我如何DoMath在第 43 行实例化类。像这样启动类似乎可行,但我的问题在于我的实际代码,所有内容都不在一个文件中,我无法弄清楚如何将类传递给apply_async函数。尝试将其放入 json dict 似乎对我不起作用。我的实际代码正在构建一个连接池,因此我可以向 API 发出 REST 请求,目前我能够让它工作的唯一方法是为每个工作人员创建一个新连接。这似乎效率极低,有没有更好的方法来解决这个问题?这是我拥有的与我想做的示例:

当前的:

def worker(arg1, arg2, arg3):
    connection = connect(
        url=arg1,
        port=arg2
    )
    connection.api(do_something=arg3)

我想做的事:

def worker(arg3, connection):
    connection.api(do_something=arg3)

其次,这可能是一个根本性的误解,但是有没有办法在池完全完成之前处理这些工作人员的结果?使用我的实际代码,我运行了超过 15000 名工作人员,每个工作人员将返回一个大约 10k 项长的列表。我希望将这些写入一个文件,但不是创建 15,000 多个文件,而是希望该过程将它们附加到文件中,每 100k 行滚动到一个新文件。我意识到我可以强制每个工作人员只将他们的输出写入一个文件,无论是在工作人员内部还是在回调中,但我担心如果我尝试在多个工作人员同时编写同一个文件的情况下进行翻转过程数据可能会丢失或未正确提交。

在我分享的第一个代码块的第 56 和 57 行,我对这些结果有点困惑。我不确定如何使用该ApplyResult对象或它应该包含的内容。values.total此外,当池和回调都返回实际值 6 时,我真的不明白为什么工作人员之外的对象返回 0。这里发生了什么事?

标签: pythonimportpython-multiprocessingglobals

解决方案


推荐阅读