python - pyqt5 python 3.8中的多处理共享内存
问题描述
我有点困惑为什么以下没有跨进程更新 numpy 数组?这没有抛出任何错误,但主进程没有反映子进程对共享数组的更新sample
。我不确定为什么这不起作用。页面上的示例在shared_memory
我的机器上运行良好。共享事件也可以正常工作。这可能是与 Qt 相关的问题吗?
import time
from PyQt5.QtCore import QTimer
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QProgressBar, QPushButton
import multiprocessing
from multiprocessing import shared_memory
import numpy as np
DATA_SIZE = 1
DTYPE = np.int64
class DataPuller(multiprocessing.get_context("spawn").Process):
def __init__(
self,
event,
shared_mem_name,
):
super().__init__()
self.event = event
self.shm = shared_memory.SharedMemory(name=shared_mem_name)
self.sample = np.ndarray(
shape=(DATA_SIZE, 1), dtype=DTYPE, buffer=self.shm.buf)
self.counter = 0
def run(self):
print("child process started")
while not self.event.is_set():
time.sleep(0.1)
self.sample[:, 0] = self.counter
print("child: ", self.sample[0, 0])
self.counter += 1
if self.counter >= 1000:
self.counter = 0
self.shm.close()
print("child process finished")
class Window(QWidget):
def __init__(self, *args, **kwargs):
super(Window, self).__init__(*args, **kwargs)
layout = QVBoxLayout(self)
self.progressBar = QProgressBar(self)
self.timer = QTimer(self)
self.timer.timeout.connect(self.onTimer)
self.progressBar.setRange(0, 1000)
layout.addWidget(self.progressBar)
layout.addWidget(QPushButton('开启线程', self, clicked=self.onStart))
self.counter = 0
self.shared_mem_name = "data"
self.base_array = np.zeros((DATA_SIZE, 1), dtype=DTYPE)
self.shm = shared_memory.SharedMemory(
create=True,
size=self.base_array.nbytes,
name=self.shared_mem_name)
self.sample = np.ndarray(
shape=self.base_array.shape,
dtype=self.base_array.dtype,
buffer=self.shm.buf)
self.event = multiprocessing.Event()
self._process = DataPuller(self.event, self.shared_mem_name)
def onStart(self):
if not self._process.is_alive():
print("main starting process")
self._process.start()
self.timer.start(500)
else:
pass
def onTimer(self):
print("main: ", self.sample[0, 0])
self.progressBar.setValue(self.sample[0, 0])
def closeEvent(self, event):
if self._process.is_alive():
self.event.set()
self._process.join()
self.shm.close()
self.shm.unlink()
self.close()
print("main process finished")
if __name__ == '__main__':
import sys
from PyQt5.QtWidgets import QApplication
app = QApplication(sys.argv)
w = Window()
w.show()
sys.exit(app.exec_())
解决方案
问题是 Event 不属于 DataPuller 上下文,因此它永远不会进入 while。解决方案是使用上下文创建事件。
问题是 DataPuller 的 __init__ 方法是在创建共享内存的初始进程中执行的,因此它不适用于辅助 DataPuller 进程。这种情况下的解决方案是在run方法中创建共享内存。
import time
from PyQt5.QtCore import QTimer
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QProgressBar, QPushButton
import multiprocessing
from multiprocessing import shared_memory
import numpy as np
DATA_SIZE = 1
DTYPE = np.int64
context = multiprocessing.get_context("spawn")
class DataPuller(context.Process):
def __init__(
self, event, shared_mem_name,
):
super().__init__()
self.event = event
self.shared_mem_name = shared_mem_name
def run(self):
self.shm = shared_memory.SharedMemory(name=self.shared_mem_name)
self.sample = np.ndarray(shape=(DATA_SIZE, 1), dtype=DTYPE, buffer=self.shm.buf)
self.counter = 0
while not self.event.is_set():
time.sleep(0.1)
self.sample[:, 0] = self.counter
print("child: ", self.sample[0, 0])
self.counter += 1
if self.counter >= 1000:
self.counter = 0
self.shm.close()
print("child process finished")
class Window(QWidget):
def __init__(self, *args, **kwargs):
super(Window, self).__init__(*args, **kwargs)
layout = QVBoxLayout(self)
self.progressBar = QProgressBar()
self.timer = QTimer(self)
self.timer.timeout.connect(self.onTimer)
self.progressBar.setRange(0, 1000)
layout.addWidget(self.progressBar)
layout.addWidget(QPushButton("开启线程", clicked=self.onStart))
self.counter = 0
self.shared_mem_name = "data"
self.base_array = np.zeros((DATA_SIZE, 1), dtype=DTYPE)
self.shm = shared_memory.SharedMemory(
create=True, size=self.base_array.nbytes, name=self.shared_mem_name
)
self.sample = np.ndarray(
shape=self.base_array.shape,
dtype=self.base_array.dtype,
buffer=self.shm.buf,
)
self.event = context.Event()
print(self.event.is_set())
self._process = DataPuller(self.event, self.shared_mem_name)
def onStart(self):
if not self._process.is_alive():
print("main starting process")
self._process.start()
self.timer.start(500)
else:
pass
def onTimer(self):
print("main: ", self.sample[0, 0])
self.progressBar.setValue(self.sample[0, 0])
def closeEvent(self, event):
if self._process.is_alive():
self.event.set()
self._process.join()
self.shm.close()
self.shm.unlink()
self.close()
print("main process finished")
if __name__ == "__main__":
import sys
from PyQt5.QtWidgets import QApplication
app = QApplication(sys.argv)
w = Window()
w.show()
sys.exit(app.exec_())
推荐阅读
- php - 如何传递特定的可选参数?
- c# - 如何使用 onvif.org 从 ONVIF 摄像机获取流
- reactjs - 从反应中的输入获取默认值
- html - 使 HTML 页面移动友好(Wix)
- excel - 使用 VBA .NumberFormat 有没有办法将整列格式化为邮政编码?
- python - Spyder 中的变量浏览器
- reactjs - 了解 React Hooks 'exhaustive-deps' lint 规则
- mongodb - 将 JSON 数据写入 MongoDB 本地的 Spark 作业错误
- android - Autocompletetextview 仅显示项目值(citta),单击时检索 2 个值
- python - 从python中的请求会话创建websocket连接