首页 > 技术文章 > python异步回调顺序?是否加锁?

tarantino 2020-12-17 17:26 原文

话不多说,直接上代码:

import time
from functools import partial
from concurrent.futures.process import ProcessPoolExecutor


# 异步回调需要加锁吗?
"""
疑问来源,在做数据分析时,为了提高程序速度,使用了进程池,并为了汇总每个进程处理的结果,使用到回调函数,
我的回调函数中需要对数据进行修改,因此不得不考虑数据安全问题。
按照我的想法:回调在进程执行完毕之后才发生,此时已经不在是并发了,那么就不会有数据安全问题,无需加锁。
但是稳妥起见,还是测试一波。
我建立了下面这个demo1,在回调的时候特地sleep2秒,然后计算程序运行总时间,根据打印结果,一共花费8秒多,
显然在处理回调函数pring_n的时候程序时同步执行的,那么自然无需加锁。
那么执行回调函数print_n的顺序是怎样的呢?是按照对应的进程结束时间调用的吗?根据源码注释:
These callables are called in the order that they were added(回调函数按照他们被添加的顺序调用)
也就是说回调函数调用顺序跟他所在进程被添加顺序一致,而非按照他所在的进程的结束时间执行。
为了验证这一点,我做了demo2,将第二个进程(i为1)sleep更长时间,从结果来看,和demo1顺序一致,
"1经过计算后的结果是1"并未在最后打印,证实了源码说法。
"""


# demo1
# def count():
#     for i in range(4):
#         yield i
#
#
# def print_n(i, ret, obj):
#     print(f'{i}经过计算后的结果是{obj.result()}')
#     ret.append(obj.result())
#     time.sleep(2)
#
#
# def do_calc(i):
#     return i ** 2
#
#
# if __name__ == '__main__':
#     s = time.time()
#     l = Lock()
#     pool = ProcessPoolExecutor(4)
#     ret = []
#     for i in range(4):
#         pool.submit(do_calc, i).add_done_callback(partial(print_n, i, ret))
#     pool.shutdown(wait=True)
#     print('end', time.time() -s)
#     print('计算结果', ret)


# demo2
def count():
    for i in range(4):
        yield i


def print_n(i, ret, obj):
    print(f'{i}经过计算后的结果是{obj.result()}')
    ret.append(obj.result())
    if i == 1:
        time.sleep(4)


def do_calc(i):
    return i ** 2


if __name__ == '__main__':
    s = time.time()
    pool = ProcessPoolExecutor(4)
    ret = []
    for i in range(4):
        pool.submit(do_calc, i).add_done_callback(partial(print_n, i, ret))
    pool.shutdown(wait=True)
    print('end', time.time() -s)
    print('计算结果', ret)

代码运行结果我没有打印出来,大家可以copy一下运行。

另外,python中很多并发库都支持回调机制,大家感兴趣可以试验一下,增强对并发的理解。

推荐阅读