首页 > 解决方案 > Python警报信号,从外部超时当前线程并处理异常?

问题描述

众所周知,python 中的信号只能在主线程内工作,这是我关于这个主题的小片段:

import signal

from threading import Timer
from time import sleep

class timeout:
    def __init__(self, seconds=1, error_message='Timeout error'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self, signum, frame):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.seconds)
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

def main():
    try:
        with timeout(seconds=2) :
            #do_something
            sleep(3)
            print ("don't come here after 3 seconds")
    except Exception as e:
        print ("catch here",str(e))
    print ("continue ...")

t = Timer(0.0, main)
t.start()

现在,为了强制它工作,我signal.signal在线程外部放置了一个带有钩子的动态函数。

class timeout:
    def __init__(self, seconds=1, error_message='Timeout error'):
        self.seconds = seconds
        self.error_message = error_message
    def handle_timeout(self):
        raise TimeoutError(self.error_message)
    def __enter__(self):
        #fluid.error = self.error_message
        #fluid.__call__ = self.handle_timeout
        signal.alarm(self.seconds)
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

class fluid:
    error = 'Orpheline exception'
    def __init__(self,signum,frame):
            self.signum = signum
            self.frame = frame
    def __call__(self):
        try:
            raise TimeoutError(self.error)
        except Exception as e:
            print ("catch now", str(e))

signal.signal(signal.SIGALRM, lambda x,y:fluid(x,y)())

t = Timer(0.0, main)

try:
    t.start()
except Exception as e:
    print ("catch there",str(e))

使用猴子补丁解决这个问题会产生以下结果:

唯一对我有用的解决方案是提供一个新的标志值skipvalue,用于检查此并行线程中是否存在异常:

class timeout:
    def __init__(self, seconds=1, error_message='Timeout error'):
        self.seconds = seconds
        self.error_message = error_message
        self.skipvalue = False
        self.SKIP = lambda : self.skipvalue
    def handle_timeout(self):
        raise TimeoutError(self.error_message)
    def timeitout(self):
        #print('not caught ',self.error_message)
        self.skipvalue = True
    def __enter__(self):
        fluid.error = self.error_message
        #fluid.__call__ = self.handle_timeout
        fluid.__call__ = self.timeitout
        signal.alarm(self.seconds)
        return self.SKIP
    def __exit__(self, type, value, traceback):
        signal.alarm(0)

def main():
    try:
        with timeout(seconds=2,error_message="Some message") as e :
            #do_something
            sleep(3)
            if e():
                raise Timeout(fluid.error)
            print ("don't come here after 3 seconds")
    except Exception as e:
        print ("catch here",str(e))
    print ("continue ...")

t = Timer(0.0, main)
t.start()

以上使用睡眠功能最多需要 3 秒,在任意循环中,我需要在每个执行周期检查新值。


我的问题

标签: pythonmultithreadingsignalspython-3.6

解决方案


Appearent 没有办法用计时器来做到这一点,但使用带有系统跟踪的线程似乎是半可能的

import sys
import trace
import threading
import time
import signal


class thread_with_trace(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, target=kwargs["target"],args=(self,))
        self.killed = False
        self.ex_handler = kwargs["handler"]

    def start(self):
        self.__run_backup = self.run
        self.run = self.__run
        threading.Thread.start(self)

    def __run(self):
        sys.settrace(self.globaltrace)
        self.__run_backup()
        self.run = self.__run_backup

    def globaltrace(self, frame, event, arg):
        if event == 'call':
            return self.localtrace
        else:
            return None

    def localtrace(self, frame, event, arg):
        if self.killed:
            if event == 'line':
                raise SystemExit()
        return self.localtrace

    def kill(self):
        self.killed = True
        raise self.ex_handler

class fluid:
    def __init__(self,signum,frame):
        self.signum = signum
        self.frame = frame
        # do whatever according to signal id

    def __call__(self):
        pass

signal.signal(signal.SIGALRM, lambda x,y:fluid(x,y)())

class timeout:
    def __init__(self, thread=lambda: None, terminatefun=lambda: None, seconds=10):
        self.seconds = seconds
        self.thisthread = thread
        self.terminatefun = terminatefun

    def handle_timeout(self):
        try:
            self.thisthread.kill()
        except Exception as e:
            print(str(e))
            self.terminatefun()

    def __enter__(self):
        fluid.__call__ = self.handle_timeout
        signal.alarm(self.seconds)

    def __exit__(self, type, value, traceback):
        signal.alarm(0)

def stopit():
    print("I should be here after two seconds")

def func(t):
    with timeout(thread=t, terminatefun=stopit ,seconds=2):
        while True:
            time.sleep(0.1)
            print("I'm running")


t1 = thread_with_trace(target=func,args=[],handler=TimeoutError("Ran out of time"))
t1.start()

它几乎符合要求,因为SystemExit()在最后一次超时到期后停止线程(在这种情况下为 0.1 秒)


推荐阅读