首页 > 解决方案 > 生产者消费者消息共享在多处理中不起作用

问题描述

我正在尝试运行一个场景,其中我有一个生产者从网络摄像头捕获帧并将其放入队列中。然后消费者从输入队列中读取图像并进行一些处理并将o / p图像放入传出队列中。

问题是,消费者从队列中读取并没有阻塞。理想情况下它应该是,当它从队列中读取值时,大小总是恒定的 128,这是错误的。我确信我放入队列的图像大小要大得多。

from __future__ import print_function

import multiprocessing
import time
import logging
import sys

import cv2


class Consumer(multiprocessing.Process):
    
    def __init__(self, incoming_q, outgoing_q):
        multiprocessing.Process.__init__(self)
        self.outgoing_q = outgoing_q
        self.incoming_q = incoming_q

    def run(self):
        proc_name = self.name
        print(f"{proc_name} - inside process_feed..starting")
        while True:
            #print(f"size of incoming_q=>{self.incoming_q.qsize()}")
            try:
                #print(f"{proc_name} - size of B incoming_q=>{self.incoming_q.qsize()}")
                image_np = self.incoming_q.get(True)
                size_of_img = sys.getsizeof(image_np)
                #print(f"{proc_name} - size of A incoming_q=>{self.incoming_q.qsize()}")
                if size_of_img > 128:
                    print(f"{proc_name} - size image=>{size_of_img}")
                    time.sleep(1)
                    self.outgoing_q.put_nowait(image_np)
            except:
                pass
        print("inside process_feed..ending")


class Producer(multiprocessing.Process):
    
    def __init__(self, incoming_q, outgoing_q):
        multiprocessing.Process.__init__(self)
        self.incoming_q = incoming_q
        self.outgoing_q = outgoing_q

    def run(self):
        proc_name = self.name
        print("inside capture_feed")
        stream = cv2.VideoCapture(0)
        try:
            counter = 0
            while True:
                counter += 1
                if counter == 1:
                    if not self.incoming_q.full():
                        (grabbed, image_np) = stream.read()
                        size_of_img = sys.getsizeof(image_np)
                        print(f"{proc_name}........B.......=>{self.incoming_q.qsize()}")
                        print(f"{proc_name} - size image=>{size_of_img}")
                        self.incoming_q.put(image_np)
                        print(f"{proc_name}........A.......=>{self.incoming_q.qsize()}")
                    counter = 0
                
                try:
                    image_np = self.outgoing_q.get_nowait()
                    logging.info("reading value for o/p")
                    cv2.imshow('object detection', image_np)
                except:
                    pass

                if cv2.waitKey(25) & 0xFF == ord('q'):
                    break
        finally:
            stream.release()
            cv2.destroyAllWindows()
        print("inside capture_feed..ending")
    

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    stream = cv2.VideoCapture(0)
    
    incoming_q = multiprocessing.Queue(maxsize=100)
    outgoing_q = multiprocessing.Queue(maxsize=100)

    logging.info("before start of thread")
    
    max_process = 1
    processes = []    
    processes.append(Producer(incoming_q, outgoing_q))
    for i in range(max_process):
        p = Consumer(incoming_q, outgoing_q)
        p.daemon = True
        processes.append(p)
    logging.info("inside main thread..middle")

    for p in processes:
        p.start()

    logging.info("inside main thread..ending")
    logging.info("waiting in main thread too....")
    logging.info("waiting in main thread finished....")
    for p in processes:
        p.join()
    logging.info("inside main thread..ended")

标签: python-3.xmultiprocessingcv2

解决方案


我能够找出我的方法的问题。我错过了泡菜(序列化)的整个概念。

我将代码更改为序列化 numpy 数组,然后再写入队列并在读取后反序列化。代码按预期开始工作。

也打印 128 作为 sizeof np 数组很好,我误解了这个数字。

    def serialize_ndarray(arr:np.ndarray):
        serialized = pickle.dumps(arr)
        return serialized


    def deserialize_ndarray(string):
        data  = pickle.loads(string)
        return data

推荐阅读