python-3.x - 生产者消费者消息共享在多处理中不起作用
问题描述
我正在尝试运行一个场景,其中我有一个生产者从网络摄像头捕获帧并将其放入队列中。然后消费者从输入队列中读取图像并进行一些处理并将o / p图像放入传出队列中。
问题是,消费者从队列中读取并没有阻塞。理想情况下它应该是,当它从队列中读取值时,大小总是恒定的 128,这是错误的。我确信我放入队列的图像大小要大得多。
from __future__ import print_function
import multiprocessing
import time
import logging
import sys
import cv2
class Consumer(multiprocessing.Process):
def __init__(self, incoming_q, outgoing_q):
multiprocessing.Process.__init__(self)
self.outgoing_q = outgoing_q
self.incoming_q = incoming_q
def run(self):
proc_name = self.name
print(f"{proc_name} - inside process_feed..starting")
while True:
#print(f"size of incoming_q=>{self.incoming_q.qsize()}")
try:
#print(f"{proc_name} - size of B incoming_q=>{self.incoming_q.qsize()}")
image_np = self.incoming_q.get(True)
size_of_img = sys.getsizeof(image_np)
#print(f"{proc_name} - size of A incoming_q=>{self.incoming_q.qsize()}")
if size_of_img > 128:
print(f"{proc_name} - size image=>{size_of_img}")
time.sleep(1)
self.outgoing_q.put_nowait(image_np)
except:
pass
print("inside process_feed..ending")
class Producer(multiprocessing.Process):
def __init__(self, incoming_q, outgoing_q):
multiprocessing.Process.__init__(self)
self.incoming_q = incoming_q
self.outgoing_q = outgoing_q
def run(self):
proc_name = self.name
print("inside capture_feed")
stream = cv2.VideoCapture(0)
try:
counter = 0
while True:
counter += 1
if counter == 1:
if not self.incoming_q.full():
(grabbed, image_np) = stream.read()
size_of_img = sys.getsizeof(image_np)
print(f"{proc_name}........B.......=>{self.incoming_q.qsize()}")
print(f"{proc_name} - size image=>{size_of_img}")
self.incoming_q.put(image_np)
print(f"{proc_name}........A.......=>{self.incoming_q.qsize()}")
counter = 0
try:
image_np = self.outgoing_q.get_nowait()
logging.info("reading value for o/p")
cv2.imshow('object detection', image_np)
except:
pass
if cv2.waitKey(25) & 0xFF == ord('q'):
break
finally:
stream.release()
cv2.destroyAllWindows()
print("inside capture_feed..ending")
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
stream = cv2.VideoCapture(0)
incoming_q = multiprocessing.Queue(maxsize=100)
outgoing_q = multiprocessing.Queue(maxsize=100)
logging.info("before start of thread")
max_process = 1
processes = []
processes.append(Producer(incoming_q, outgoing_q))
for i in range(max_process):
p = Consumer(incoming_q, outgoing_q)
p.daemon = True
processes.append(p)
logging.info("inside main thread..middle")
for p in processes:
p.start()
logging.info("inside main thread..ending")
logging.info("waiting in main thread too....")
logging.info("waiting in main thread finished....")
for p in processes:
p.join()
logging.info("inside main thread..ended")
解决方案
我能够找出我的方法的问题。我错过了泡菜(序列化)的整个概念。
我将代码更改为序列化 numpy 数组,然后再写入队列并在读取后反序列化。代码按预期开始工作。
也打印 128 作为 sizeof np 数组很好,我误解了这个数字。
def serialize_ndarray(arr:np.ndarray):
serialized = pickle.dumps(arr)
return serialized
def deserialize_ndarray(string):
data = pickle.loads(string)
return data
推荐阅读
- arduino - 我如何使用字符串作为 pinname(nodemcu 板)
- console-application - 为什么 ANSI 转义码不能正确显示?
- c++ - c++ 中的每个对象是否包含不同版本的类成员函数?
- c++ - 为什么在 new 运算符的情况下会自动调用析构函数?
- node.js - 为什么节点快递在第一次请求后发送不同的标头
- react-apollo - 设置 ApolloGraphQL 持久化查询时,如何支持 /GraphQL?
- javascript - javascript Audio() 到底是什么,它是如何工作的?它与
- ruby - 哈希键和值
- python - 无法使用 docker compose 连接到 docker 容器上的 neo4j 数据库
- mysql - MySQL 8.0.21 中的 ROLLUP 问题