首页 > 解决方案 > 使用并发期货时如何共享状态

问题描述

我知道使用传统的多处理库我可以声明一个值并在进程之间共享状态。

https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#sharing-state-between-processes

使用较新的concurrent.futures库时,如何在进程之间共享状态?

import concurrent.futures

def get_user_object(batch):
    # do some work
    counter = counter + 1
    print(counter)

def do_multithreading(batches):
    with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
        threadingResult = executor.map(get_user_object, batches)

def run():
    data_pools = get_data()
    start = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:
        processResult = executor.map(do_multithreading, data_pools)
    end = time.time()
    print("TIME TAKEN:", end - start)

if __name__ == '__main__':
    run()

我想保持这个计数器的同步值。

在以前的库中,我可能使用过multiprocessing.Value和一个Lock.

标签: pythonpython-3.xpython-multiprocessingpython-multithreading

解决方案


你可以像你想要的那样传递一个initializerand 。这是一个例子:initargsProcessPoolExecutormultiprocessing.Pool

import concurrent.futures
import multiprocessing as mp


def get_user_object(batch):
    with _COUNTER.get_lock():
        _COUNTER.value += 1
        print(_COUNTER.value, end=' ')


def init_globals(counter):
    global _COUNTER
    _COUNTER = counter


def main():
    counter = mp.Value('i', 0)
    with concurrent.futures.ProcessPoolExecutor(
        initializer=init_globals, initargs=(counter,)
    ) as executor:
        for _ in executor.map(get_user_object, range(10)):
            pass
    print()


if __name__ == "__main__":
    import sys
    sys.exit(main())

采用:

$ python3 glob_counter.py 
1 2 4 3 5 6 7 8 10 9 

在哪里:

  • for _ in executor.map(get_user_object, range(10)):让您遍历每个结果。在这种情况下,get_user_object()返回None,所以你真的没有什么要处理的;你只是pass,没有采取进一步的行动。
  • 最后一次print()调用给你一个额外的换行符,因为原始print()调用不使用换行符(end=' '')

推荐阅读