首页 > 解决方案 > 使用异常取消长函数

问题描述

我有execute调用一些非常长的任务的功能。我希望有可能取消此类任务。我想过抛出一个异常并在外面捕获它。问题是,我的异常在future. 如何修复此代码以正确取消?

def cancel_task(self):
   while True:
      process = get_process()
      if process.status == "CANCELLED":
         raise OperationCancelledException()
      time.sleep(1)

def execute(self):
   with ThreadPoolExecutor() as executor:
      future = executor.submit(self.cancel_task)
      result = some_very_long_task()
      future.cancel()

   return result
# somewhere in the code where execute() is called
try:
   execute()
except OperationCancelledException:
   pass
except Exception:
   raise

标签: pythonpython-3.x

解决方案


此处已初始化未来对象:

future = executor.submit(self.cancel_task)

然后,我们运行长任务:

result = some_very_long_task()

此调用some_very_long_task()将首先运行到完成,然后再继续下一行。无论未来对象此时是否引发异常,都不会影响当前执行。实际上,正如记录的那样,您必须显式调用Future.result()来重新引发提交过程中发生的任何异常(这里是self.cancel_task)。

class concurrent.futures.Future

result(timeout=None)

如果调用引发异常,此方法将引发相同的异常。

所以即使你叫它:

future = executor.submit(self.cancel_task)
result = some_very_long_task()
future.result()

它只会在some_very_long_task()运行完成后运行并重新引发任何异常,因此没有意义,因为它实际上并没有取消/停止长任务的执行。

另外一个附注,未来的对象一旦开始就不能被取消,如文件所述:

cancel()

尝试取消通话。如果调用当前正在执行或完成运行并且无法取消,则该方法将返回 False,否则该调用将被取消并且该方法将返回 True。

即使提交some_very_long_task给 executor 并将timeout参数设置为result()也无济于事,因为它仍然会等待任务完成,只是它会在完成后超过TimeoutError超时时间时引发 a 。

替代解决方案

也许你会找到一种方法,但似乎concurrent.futures不是这项工作的工具。您可以考虑改用多处理

  1. 产生一个新的进程some_very_long_task
  2. 在后台运行进程
  3. 当进程在后台运行时,检查它是否必须已经取消。
  4. 如果该过程完成,则照常进行。
  5. 但是如果这个过程还没有完成并且我们已经收到一个取消它的信号,那么终止这个过程。
from datetime import datetime, timezone
import multiprocessing
import time

class OperationCancelledException(Exception):
    pass

def some_very_long_task(response):
    print("Start long task")
    time.sleep(10)  # Simulate the long task with a sleep
    print("Finished long task")
    response['response'] = "the response!"

def get_process_status():
    # Let's say the signal to cancel an ongoing process comes from a file.
    with open('status.txt') as status_file:
        return status_file.read()

def execute():
    response = multiprocessing.Manager().dict()

    proc = multiprocessing.Process(target=some_very_long_task, args=(response,), kwargs={})
    proc.start()

    while proc.is_alive():
        status = get_process_status()
        if status == "CANCELLED":
            proc.terminate()
            raise OperationCancelledException()
        time.sleep(1)

    proc.join()
    return response.get('response')

try:
    print(datetime.now(timezone.utc), "Script started")
    result = execute()
except OperationCancelledException:
   print("Operation cancelled")
else:
    print(result)
finally:
    print(datetime.now(timezone.utc), "Script ended")

输出如果status.txt包含"PENDING"

$ python3 script.py 
2021-09-08 13:17:32.234437+00:00 Script started
Start long task
Finished long task
the response!
2021-09-08 13:17:42.293814+00:00 Script ended
  • 如果没有取消它的信号,任务将运行完成(10 秒睡眠)。

如果status.txt包含则输出"PENDING",然后"CANCELLED"在脚本运行时更改为:

$ python3 script.py 
2021-09-08 13:19:13.370828+00:00 Script started
Start long task
Operation cancelled
2021-09-08 13:19:16.403367+00:00 Script ended
  • 如果有取消它的信号,任务会在 3 秒后(文件更新的时间)停止。

相关参考:


推荐阅读