首页 > 解决方案 > pyqt5 python 3.8中的多处理共享内存

问题描述

我有点困惑为什么以下没有跨进程更新 numpy 数组?这没有抛出任何错误,但主进程没有反映子进程对共享数组的更新sample。我不确定为什么这不起作用。页面上的示例在shared_memory我的机器上运行良好。共享事件也可以正常工作。这可能是与 Qt 相关的问题吗?

import time
from PyQt5.QtCore import QTimer
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QProgressBar, QPushButton
import multiprocessing
from multiprocessing import shared_memory
import numpy as np

DATA_SIZE = 1
DTYPE = np.int64


class DataPuller(multiprocessing.get_context("spawn").Process):
    def __init__(
            self,
            event,
            shared_mem_name,
    ):
        super().__init__()
        self.event = event
        self.shm = shared_memory.SharedMemory(name=shared_mem_name)
        self.sample = np.ndarray(
            shape=(DATA_SIZE, 1), dtype=DTYPE, buffer=self.shm.buf)
        self.counter = 0

    def run(self):
        print("child process started")
        while not self.event.is_set():
            time.sleep(0.1)
            self.sample[:, 0] = self.counter
            print("child: ", self.sample[0, 0])
            self.counter += 1
            if self.counter >= 1000:
                self.counter = 0
        self.shm.close()
        print("child process finished")


class Window(QWidget):
    def __init__(self, *args, **kwargs):
        super(Window, self).__init__(*args, **kwargs)
        layout = QVBoxLayout(self)
        self.progressBar = QProgressBar(self)
        self.timer = QTimer(self)
        self.timer.timeout.connect(self.onTimer)
        self.progressBar.setRange(0, 1000)
        layout.addWidget(self.progressBar)
        layout.addWidget(QPushButton('开启线程', self, clicked=self.onStart))

        self.counter = 0
        self.shared_mem_name = "data"
        self.base_array = np.zeros((DATA_SIZE, 1), dtype=DTYPE)
        self.shm = shared_memory.SharedMemory(
            create=True,
            size=self.base_array.nbytes,
            name=self.shared_mem_name)
        self.sample = np.ndarray(
            shape=self.base_array.shape,
            dtype=self.base_array.dtype,
            buffer=self.shm.buf)
        self.event = multiprocessing.Event()
        self._process = DataPuller(self.event, self.shared_mem_name)

    def onStart(self):
        if not self._process.is_alive():
            print("main starting process")
            self._process.start()
            self.timer.start(500)
        else:
            pass

    def onTimer(self):
        print("main: ", self.sample[0, 0])
        self.progressBar.setValue(self.sample[0, 0])

    def closeEvent(self, event):
        if self._process.is_alive():
            self.event.set()
            self._process.join()
        self.shm.close()
        self.shm.unlink()
        self.close()
        print("main process finished")


if __name__ == '__main__':
    import sys
    from PyQt5.QtWidgets import QApplication
    app = QApplication(sys.argv)
    w = Window()
    w.show()
    sys.exit(app.exec_())

标签: pythonpyqt

解决方案


问题是 Event 不属于 DataPuller 上下文,因此它永远不会进入 while。解决方案是使用上下文创建事件。

问题是 DataPuller 的 __init__ 方法是在创建共享内存的初始进程中执行的,因此它不适用于辅助 DataPuller 进程。这种情况下的解决方案是在run方法中创建共享内存。

import time
from PyQt5.QtCore import QTimer
from PyQt5.QtWidgets import QWidget, QVBoxLayout, QProgressBar, QPushButton
import multiprocessing
from multiprocessing import shared_memory
import numpy as np

DATA_SIZE = 1
DTYPE = np.int64

context = multiprocessing.get_context("spawn")


class DataPuller(context.Process):
    def __init__(
        self, event, shared_mem_name,
    ):
        super().__init__()
        self.event = event
        self.shared_mem_name = shared_mem_name

    def run(self):
        self.shm = shared_memory.SharedMemory(name=self.shared_mem_name)
        self.sample = np.ndarray(shape=(DATA_SIZE, 1), dtype=DTYPE, buffer=self.shm.buf)
        self.counter = 0

        while not self.event.is_set():
            time.sleep(0.1)
            self.sample[:, 0] = self.counter
            print("child: ", self.sample[0, 0])
            self.counter += 1
            if self.counter >= 1000:
                self.counter = 0
        self.shm.close()
        print("child process finished")


class Window(QWidget):
    def __init__(self, *args, **kwargs):
        super(Window, self).__init__(*args, **kwargs)
        layout = QVBoxLayout(self)
        self.progressBar = QProgressBar()
        self.timer = QTimer(self)
        self.timer.timeout.connect(self.onTimer)
        self.progressBar.setRange(0, 1000)
        layout.addWidget(self.progressBar)
        layout.addWidget(QPushButton("开启线程", clicked=self.onStart))

        self.counter = 0
        self.shared_mem_name = "data"
        self.base_array = np.zeros((DATA_SIZE, 1), dtype=DTYPE)
        self.shm = shared_memory.SharedMemory(
            create=True, size=self.base_array.nbytes, name=self.shared_mem_name
        )
        self.sample = np.ndarray(
            shape=self.base_array.shape,
            dtype=self.base_array.dtype,
            buffer=self.shm.buf,
        )
        self.event = context.Event()
        print(self.event.is_set())
        self._process = DataPuller(self.event, self.shared_mem_name)

    def onStart(self):
        if not self._process.is_alive():
            print("main starting process")
            self._process.start()
            self.timer.start(500)
        else:
            pass

    def onTimer(self):
        print("main: ", self.sample[0, 0])
        self.progressBar.setValue(self.sample[0, 0])

    def closeEvent(self, event):
        if self._process.is_alive():
            self.event.set()
            self._process.join()
        self.shm.close()
        self.shm.unlink()
        self.close()
        print("main process finished")


if __name__ == "__main__":
    import sys
    from PyQt5.QtWidgets import QApplication

    app = QApplication(sys.argv)
    w = Window()
    w.show()
    sys.exit(app.exec_())

推荐阅读