首页 > 解决方案 > 如果出现意外异常,自动重新启动线程

问题描述

这个即示例程序:

import logging
import threading
import random
import time
import sys

def Thread1(a, b):
    logging.info('INIT')

    while True:
        r = random.randint(a, b)
    
        logging.debug(f'sleep for {r} seconds')
        logging.debug(f'5 / {r} = { 5 / r}') # possible: ZeroDivisionError: division by zero
        time.sleep(r)
        continue

    logging.info('Exiting')


def Thread2(a, b):
    logging.info('INIT')

    while True:
        r = random.randint(a, b)
        logging.debug(f'5 / {r} = { 5 / r}') # possible: ZeroDivisionError: division by zero

        logging.debug(f'sleep for {r} seconds')
        time.sleep(r)
        continue

    logging.info('Exiting')


#
# my_excepthook
#
def my_excepthook(args):
    #print(f'In excepthook {args=} --- {type(args)}')

    exc_type, exc_value, exc_traceback = sys.exc_info()
    print( 'Handling %s exception with message "%s" in %s' % \
        (exc_type.__name__, exc_value, threading.current_thread().name))

    if threading.current_thread().name == 'Thread_1':
        t1 = threading.Thread(name='Thread_1', target=Thread1, args=(0, 4), daemon=False)
        t1.start()

    if threading.current_thread().name == 'Thread_2':
        t2 = threading.Thread(name='Thread_2', target=Thread2, args=(0, 4), daemon=False)
        t2.start()

threading.excepthook = my_excepthook


if __name__ == "__main__":
    format='%(asctime)s - %(threadName)10s - %(lineno)4d - %(message)s'
    logging.basicConfig(level=logging.DEBUG, format=format)

    #
    # When there is exception, thread will stop working
    #
    t1 = threading.Thread(name='Thread_1', target=Thread1, args=(0, 4), daemon=False)
    t1.start()

    #
    # When there is exception, thread will stop working
    #
    t2 = threading.Thread(name='Thread_2', target=Thread2, args=(0, 4), daemon=False)       
    t2.start()

    #
    # How to restart thread manually
    #
    # logging.debug('Before Main loop')
    # while True:
    #     time.sleep(5)  # how often to check

    #     if t1.is_alive() == False:
    #         logging.critical('Restarting Thread_1')
    #         t1 = threading.Thread(name='Thread_1', target=Thread1, args=(0, 10), daemon=True)
    #         t1.start()

    #     if t2.is_alive() == False:
    #         logging.critical('Restarting Thread_2')
    #         t2 = threading.Thread(name='Thread_2', target=Thread1, args=(0, 10), daemon=True)
    #         t2.start()
   
    logging.info('MAIN END') 

我的问题是 Python 重启线程的方法是什么?

到目前为止,我找到了 2 个替代方案?

首先:是在主程序结束时进行无限循环,其中在每个线程上调用检查 is_alive() 并在必要时重新启动它。

第二:是使用带有自定义功能的threading.excepthook。因为在这两种选择中我都需要在每个线程上检查 is_alive() ,所以它们或多或少是相同的方法。threading.excepthook 的唯一好处是重启线程没有延迟。

我错过了什么,有更好的方法吗?

任何反馈表示赞赏。

标签: pythonthread-safetypython-multithreading

解决方案


您可以尝试重构您的代码并使用ThreadPoolExecutorand pool.submit,如果池的线程内发生任何错误,该错误将被忽略。另一方面,我个人的选择是将其他线程中的所有代码打包成try ... except Exception块。

import logging
import random
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial


def some_func(_, a, b):
    r = random.randint(a, b)
    logging.debug(f'sleep for {r} seconds')
    logging.debug(f'5 / {r} = {5 / r}')  # possible: ZeroDivisionError: division by zero
    time.sleep(r)


if __name__ == '__main__':
    logging.basicConfig(format="%(threadName)s | %(asctime)s | %(msg)s", level=logging.DEBUG)
    # better to generate tasks in Main thread, than make infinite loop in child thread
    tasks = [_ for _ in range(10000)] 
    with ThreadPoolExecutor(4) as pool:
        # send tasks to other threads
        for t in tasks:
            pool.submit(partial(some_func, a=0, b=1), t)

推荐阅读