首页 > 解决方案 > 使用 ThreadPoolExecutor 忽略未使用的期货

问题描述

我正在运行两个函数,fast()并且slow(),并行使用ThreadPoolExecutor. 如果fast()返回非None结果,我想使用它,否则,使用slow()结果。这是一个例子:

from concurrent.futures import ThreadPoolExecutor
from time import sleep

def fast():
    sleep(2)
    return 'fast'

def slow():
    sleep(4)
    return 'slow'

def run_parallel():
    with ThreadPoolExecutor() as executor:
        fast_future = executor.submit(fast)
        slow_future = executor.submit(slow)

        fast_result = fast_future.result()
        if fast_result is not None:
            slow_future.cancel()
            return fast_result

        return slow_future.result()

print(run_parallel())

运行输出:

$ time python example.py 
fast

real    0m4.058s
user    0m0.041s
sys 0m0.011s

由于fast()返回了一个非None值,我预计这需要 2 秒而不是 4 秒,特别是因为我有那条线到slow_future.cancel().

我的理想语法是这样的:

combined_future = fast_future.orElse(slow_future)
return combined_future.result()

我该怎么做才能获得这种预期的行为?

标签: pythonfuturethreadpoolexecutor

解决方案


这是由于缓慢的未来不可取消(slow_future.cancel()返回 False),因此线程池执行程序等待线程。尝试类似:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep

abort_flag = False


def fast():
    for i in range(20):
        if abort_flag:
            return None
        sleep(.1)
    return 'fast'


def slow():
    for i in range(40):
        if abort_flag:
            return None
        sleep(.1)
    return 'slow'


def run_parallel():
    global abort_flag
    with ThreadPoolExecutor() as executor:
        abort_flag = False
        fast_future = executor.submit(fast)
        slow_future = executor.submit(slow)
        for f in as_completed((fast_future, slow_future)):
            result = f.result()
            if result is not None:
                abort_flag = True
                return result


print(run_parallel())

推荐阅读