首页 > 解决方案 > Python 队列:如何延迟/延长/修改阻塞获取的超时?

问题描述

我有一个queue.Queue由线程填充的。一个方法尝试从这个队列接收超时。现在让我们说,另一个线程可以重置我们队列等待的超时,如果队列没有及时送入,我们的接收器函数应该继续,更新超时。我可以如下实现,但是我必须修改内置queue.Queue类,以便在等待期间可以修改方法中的endtime参数......有没有更好的解决方案?get()(我不想使用异步...)

from threading import Thread
from queue import Queue, Empty
import time

q = Queue()
TIMEOUT = 1
RESET_TIME = 0.5
PUT_TIME = 1.2
t0 = time.time()

def receive():
    try:
        _res = q.get(block=True, timeout=TIMEOUT)
        print(f'get @ {time.time()-t0}')
        return _res
    except Empty:
        print(f'to @ {time.time()-t0}')
        return None

def feed_queue():
    time.sleep(PUT_TIME)
    print(f'put @ {time.time()-t0}')
    q.put_nowait(42)

def reset_timeout():
    time.sleep(RESET_TIME)
    with q.mutex:
        q.endtime += TIMEOUT
    print(f'reset @ {time.time()-t0}')

if __name__ == '__main__':
    Thread(target=feed_queue).start()
    Thread(target=reset_timeout).start()
    res = receive()
    print('res:', res)

这产生:

reset @ 0.5013222694396973
put @ 1.201164722442627
get @ 1.201164722442627
res: 42

在 queue.py 中进行了以下修改以使其正常工作:

Index: queue.py
===================================================================
--- queue.py    (revision 28725)
+++ queue.py    (working copy)
@@ -52,6 +52,7 @@
         # drops to zero; thread waiting to join() is notified to resume
         self.all_tasks_done = threading.Condition(self.mutex)
         self.unfinished_tasks = 0
+        self.endtime = 0

     def task_done(self):
         '''Indicate that a formerly enqueued task is complete.
@@ -171,9 +172,9 @@
             elif timeout < 0:
                 raise ValueError("'timeout' must be a non-negative number")
             else:
-                endtime = time() + timeout
+                self.endtime = time() + timeout
                 while not self._qsize():
-                    remaining = endtime - time()
+                    remaining = self.endtime - time()
                     if remaining <= 0.0:
                         raise Empty
                     self.not_empty.wait(remaining)

标签: pythonmultithreadingasynchronousqueuetimeout

解决方案


您可以创建自己的类,继承Queue和添加全局变量endtime,如下所示:

class Waszil(Queue):
    def __init__(self, maxsize=0):
        super().__init__(self)
        self.maxsize = maxsize
        self._init(maxsize)
        self.endtime = 0

然后只需更改q = Queue()q = Waszil(),您应该一切顺利。

编辑:如果您更喜欢在 Waszil 类中实现固有的线程安全,您可以threading.Lock像这样使用:

from threading import Lock

class Waszil(Queue):
    def __init__(self, maxsize=0):
        super().__init__(self)
        self.threadLock = Lock()
        self.maxsize = maxsize
        self._init(maxsize)
        self.endtime = 0

    def increment_endtime(self):
        with self.threadLock:
            self.endtime += 1

在这种情况下,而不是您的

with q.mutex:
    q.endtime += TIMEOUT    

你只需打电话q.increment_endtime()


推荐阅读