python - Python线程,Event和Queue如何协同工作?
问题描述
在看了比斯利书中的例子后,我正在和我的朋友交谈
class ActorExit(Exception):
pass
class Actor:
def __init__(self):
self._mailbox = Queue()
def send(self, msg):
self._mailbox.put(msg)
def recv(self):
msg = self._mailbox.get()
if msg is ActorExit:
raise ActorExit()
return msg
def close(self):
self.send(ActorExit)
def start(self):
self._terminated = Event()
t = Thread(target=self._bootstrap)
t.daemon = True
t.start()
def _bootstrap(self):
try:
self.run()
except ActorExit:
pass
finally:
self._terminated.set()
def join(self):
self._terminated.wait()
def run(self):
while True:
msg = self.recv()
class PrintActor(Actor):
def run(self):
while True:
msg = self.recv()
print('Got:', msg)
我的朋友认为 Event 的唯一目的是阻塞主线程,直到另一个线程执行 set 操作。真的吗?我们如何观察线程执行?
解决方案
Python线程,Event和Queue如何协同工作?
他们没有。您可以使用没有队列的事件和没有事件的队列,彼此之间没有依赖关系。您的示例恰好同时使用了两者。
我的朋友认为 Event 的唯一目的是阻塞主线程,直到另一个线程执行 set 操作。真的吗?
调用.wait()
事件对象将阻塞任何调用线程,直到内部标志为.set()
.
如果您查看 Event 的源代码,您会发现 Event 仅包含一个带锁的 Condition 变量和一个布尔标志 + 用于处理和传达(与等待线程)该标志状态更改的方法。
class Event:
"""Class implementing event objects.
Events manage a flag that can be set to true with the set() method and reset
to false with the clear() method. The wait() method blocks until the flag is
true. The flag is initially false.
"""
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
...
我们如何观察线程执行?
一个简单的方法是应用某种实用函数来打印出您感兴趣的内容,例如:
def print_info(info=""):
"""Print calling function's name and thread with optional info-text."""
calling_func = sys._getframe(1).f_code.co_name
thread_name = threading.current_thread().getName()
print(f"<{thread_name}, {calling_func}> {info}", flush=True)
另一种可能性是在这个答案中使用日志记录。
不确定 Beazly 想用您展示的代码演示什么,但对于这个简单的任务,我认为它有点过度设计。当您已经使用队列时,无需在此处添加事件。您可以通过传递一个标记值来初始化线程终止。
这是您的示例的简化版本,带有哨兵('STOP')和一些print_info
来自上面的信息打印:
import sys
import time
import threading
from queue import Queue
class Actor(threading.Thread):
def __init__(self):
super().__init__(target=self.run)
self.queue = Queue()
def send(self, msg):
self.queue.put(msg)
print_info(f"sent: {msg}") # DEBUG
def close(self):
print_info() # DEBUG
self.send('STOP')
def run(self):
for msg in iter(self.queue.get, 'STOP'):
pass
class PrintActor(Actor):
def run(self):
for msg in iter(self.queue.get, 'STOP'):
print_info(f"got: {msg}") # DEBUG
if __name__ == '__main__':
pa = PrintActor()
pa.start()
pa.send("Hello")
time.sleep(2)
pa.send("...World!")
time.sleep(2)
pa.close()
pa.join()
输出:
<MainThread, send> sent: Hello
<Thread-1, run> got: Hello
<MainThread, send> sent: ...World!
<Thread-1, run> got: ...World!
<MainThread, close>
<MainThread, send> sent: STOP
推荐阅读
- python - if 块的第一个条件被跳过
- awk - awk比lz4解压慢
- pointers - 指针是多态的一个例子吗?
- google-cloud-firestore - 如何插入具有唯一 GUID 的新 Firestore 集合文档?
- python - 使用带有烧瓶的 Authlib 获取和存储刷新令牌
- python - 如果在另一个嵌套列表中找不到,则删除嵌套列表项?
- c++ - “静态 char __ = []() -> char”的含义
- javascript - Chrome 的 JavaScript 分析器中的“任务”需要更长的时间,但没有调用任何函数
- github - Github API - 如何知道一个问题是否被分叉拉取请求关闭?
- c# - 我在 Visual Studio C# 中的项目只响应 D 而不是其他键