python - Python 队列:如何延迟/延长/修改阻塞获取的超时?
问题描述
我有一个queue.Queue
由线程填充的。一个方法尝试从这个队列接收超时。现在让我们说,另一个线程可以重置我们队列等待的超时,如果队列没有及时送入,我们的接收器函数应该继续,更新超时。我可以如下实现,但是我必须修改内置queue.Queue
类,以便在等待期间可以修改方法中的endtime
参数......有没有更好的解决方案?get()
(我不想使用异步...)
from threading import Thread
from queue import Queue, Empty
import time
q = Queue()
TIMEOUT = 1
RESET_TIME = 0.5
PUT_TIME = 1.2
t0 = time.time()
def receive():
try:
_res = q.get(block=True, timeout=TIMEOUT)
print(f'get @ {time.time()-t0}')
return _res
except Empty:
print(f'to @ {time.time()-t0}')
return None
def feed_queue():
time.sleep(PUT_TIME)
print(f'put @ {time.time()-t0}')
q.put_nowait(42)
def reset_timeout():
time.sleep(RESET_TIME)
with q.mutex:
q.endtime += TIMEOUT
print(f'reset @ {time.time()-t0}')
if __name__ == '__main__':
Thread(target=feed_queue).start()
Thread(target=reset_timeout).start()
res = receive()
print('res:', res)
这产生:
reset @ 0.5013222694396973
put @ 1.201164722442627
get @ 1.201164722442627
res: 42
在 queue.py 中进行了以下修改以使其正常工作:
Index: queue.py
===================================================================
--- queue.py (revision 28725)
+++ queue.py (working copy)
@@ -52,6 +52,7 @@
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
+ self.endtime = 0
def task_done(self):
'''Indicate that a formerly enqueued task is complete.
@@ -171,9 +172,9 @@
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
- endtime = time() + timeout
+ self.endtime = time() + timeout
while not self._qsize():
- remaining = endtime - time()
+ remaining = self.endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
解决方案
您可以创建自己的类,继承Queue
和添加全局变量endtime
,如下所示:
class Waszil(Queue):
def __init__(self, maxsize=0):
super().__init__(self)
self.maxsize = maxsize
self._init(maxsize)
self.endtime = 0
然后只需更改q = Queue()
为q = Waszil()
,您应该一切顺利。
编辑:如果您更喜欢在 Waszil 类中实现固有的线程安全,您可以threading.Lock
像这样使用:
from threading import Lock
class Waszil(Queue):
def __init__(self, maxsize=0):
super().__init__(self)
self.threadLock = Lock()
self.maxsize = maxsize
self._init(maxsize)
self.endtime = 0
def increment_endtime(self):
with self.threadLock:
self.endtime += 1
在这种情况下,而不是您的
with q.mutex:
q.endtime += TIMEOUT
你只需打电话q.increment_endtime()
推荐阅读
- iis - Vuejs 部署问题:IIS、路由
- javascript - ReactJS:解析csv日期
- python - 如何根据规则提取唯一行?
- unix - awk 用列分隔符写入文件
- python - 如何使用 Python 从网页中提取 td 值?
- python - 在 matplotlib.pyplot 的 x 轴上设置日志间隔
- sql-server - SQL Server Express 2014 - @@version 2008
- azure - 由于 WAF 不支持,WAF 与负载均衡器的替代用法?
- batch-file - 在这种情况下,xcopy 整个文件夹
- c# - 建议的操作等待用户首先选择