python - 具有大量数据的多处理队列导致 _wait_for_tstate_lock
问题描述
当我在 a和 athreading._wait_for_tstate_lock
之间传输休数据时引发异常via 。Process
Thread
multiprocessing.Queue
我的最小工作示例首先看起来有点复杂 - 抱歉。我会解释。原始应用程序将大量(不那么重要)文件加载到 RAM 中。这是在一个单独的过程中完成的以节省资源。主 gui 线程不应该冻结。
GUI 启动一个单独
Thread
的以防止 gui 事件循环冻结。这个分离
Thread
然后开始一个Process
应该做的工作。
a)这Thread
实例化了 a multiprocess.Queue
(注意这是 amultiprocessing
而不是threading
!)
b)这是为了将Process
数据从Process
后端共享到Thread
.
做了
Process
一些工作(3 个步骤)并将.put()
结果放入multiprocessing.Queue
.当
Process
两端Thread
再次接管并从 中收集数据时Queue
,将其存储到自己的属性MyThread.result
中。告诉 GUI主
Thread
循环/线程如果有时间调用回调函数。回调函数 (
MyWindow::callback_thread_finished()
) 从MyWindow.thread.result
.
问题是,如果放入的数据Queue
发生了我不明白的大事 -MyThread
永无止境。我必须通过 Strg+C 取消应用程序。
我从文档中得到了一些提示。但我的问题是我没有完全理解文档。但我有一种感觉,我的问题的关键可以在那里找到。请参阅“管道和队列”(Python 3.5 文档)中的两个红色方框。那是完整的输出
MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
t.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
util._exit_function()
File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
_run_finalizers()
File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
finalizer()
File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
thread.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
这是最小的工作示例
#!/usr/bin/env python3
import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
class MyThread (threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
def run(self):
print('Running MyThread...')
self.result = []
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
process.join()
while not queue.empty():
process_result = queue.get()
self.result.append(process_result)
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess (multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x'*102048))
print('MyProcess stoppd.')
class MyWindow (Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self)
self.connect('destroy', Gtk.main_quit)
GLib.timeout_add(2000, self.do_start)
def do_start(self):
print('MyWindow::do_start()')
# The process need to be started from a separate thread
# to prevent the main thread (which is the gui main loop)
# from freezing while waiting for the process result.
self.thread = MyThread(self.callback_thread_finished)
self.thread.start()
def callback_thread_finished(self):
result = self.thread.result
for r in result:
print('{} {}...'.format(r[0], r[1][:10]))
if __name__ == '__main__':
win = MyWindow()
win.show_all()
Gtk.main()
可能重复但完全不同,IMO 对我的情况没有答案:Thread._wait_for_tstate_lock() never returns。
解决方法
通过修改第 22 行来使用管理器queue = multiprocessing.Manager().Queue()
来解决问题。但我不知道为什么。我提出这个问题的目的是了解背后的东西,而不仅仅是让我的代码工作。即使我真的不知道 aManager()
是什么以及它是否有其他(导致问题的)含义。
解决方案
根据您链接到的文档中的第二个警告框,当您在处理队列中的所有项目之前加入进程时,您可能会遇到死锁。因此,启动流程并立即加入它然后处理队列中的项目是错误的步骤顺序。您必须启动该过程,然后接收项目,然后只有在收到所有项目后才能调用 join 方法。定义一些标记值来表示进程已完成通过队列发送数据。None
例如,如果这不是您期望从流程中获得的常规值。
class MyThread(threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
self.result = []
def run(self):
print('Running MyThread...')
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
while True:
process_result = queue.get()
if process_result is None:
break
self.result.append(process_result)
process.join()
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x' * 102048))
self.queue.put(None)
print('MyProcess stoppd.')
推荐阅读
- c# - Application.ThreadException 在 try... catch 块之前触发
- selenium - Link 有 Javascript 时无法通过 XPath 找到元素
- alluxio - 关于Alluxio中level0.dirs.quota和alluxio.user.file.write.tier.default的配置问题
- powershell - 执行 power shell 脚本时出错
- gradle - 更改flutter使用的默认gradle文件夹路径
- c++ - 链接 OpenCV 失败,LNK 2019 未解析的外部符号
- javascript - 如何使用 Next.js 实现 Quill 或 Draft.js 等富文本编辑器?
- powershell - 使用 Powershell 脚本创建任务计划程序
- python - 如何删除“。” 和字符串中的“-”?
- python - 使用TCP同时在两个进程之间进行多处理连接