python - 使用 Python 条件变量的有界大小队列的生产者消费者
问题描述
我试图了解如何实现一个有界大小的队列以供多个生产者和消费者使用。我有这个代码:
尝试1:
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 0
self.start = 0
self.size = size
self.curr_size = 0
self.open = Condition()
self.closed = Condition()
self.closed.acquire()
def put(self, val):
self.open.acquire()
while self.curr_size == self.size:
self.open.wait()
self.buff[self.end] = val
self.end = (self.end+1)%self.size
self.curr_size+=1
self.closed.notify()
self.closed.release()
def get(self):
self.closed.acquire()
while self.curr_size == 0:
self.closed.wait()
val = self.buff[self.start]
self.start = (self.start+1)%self.size
self.curr_size-=1
self.open.notify()
self.open.release()
return val
我可以进一步简化这一点(例如,只使用一个条件变量或互斥锁)?
更新 A:上面的代码示例只允许将一项放入队列中,直到调用 get 才允许再放入,浪费了缓冲区的其余部分。这是尝试修复它的代码的更新:
尝试 2
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 0
self.start = 0
self.size = size
self.curr_size = 0
self.mutex = Lock()
self.open = Condition(self.mutex)
self.closed = Condition(self.mutex)
def put(self, val):
self.mutex.acquire()
while self.curr_size == self.size:
self.open.wait()
self.buff[self.end] = val
self.end = (self.end+1)%self.size
self.curr_size+=1
self.closed.notify()
self.mutex.release()
def get(self):
self.mutex.acquire()
while self.curr_size == 0:
self.closed.wait()
val = self.buff[self.start]
self.start = (self.start+1)%self.size
self.curr_size-=1
self.open.notify()
self.mutex.release()
return val
更新B:这里生产者阻止消费者,反之亦然,有没有一种方法可以像这里使用信号量一样并发?
尝试 3:
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 1
self.start = 0
self.size = size
self.start_lock = Lock()
self.end_lock = Lock()
self.open = Condition(self.end_lock)
self.closed = Condition(self.start_lock)
self.start_lock.acquire()
def size_fn(self):
return self.end + self.size - self.start if self.end <= self.start else self.end - self.start
def put(self, val):
with self.end_lock:
while size_fn() == self.size:
self.open.wait()
self.buff[self.end-1] = val
self.end = (self.end+1)%self.size
self.closed.notify()
def get(self):
with self.start_lock:
while size_fn() == 0:
self.closed.wait()
val = self.buff[(self.start+1)%self.size]
self.start = (self.start+1)%self.size
self.open.notify()
return val
在这里,生产者和消费者使用不同的互斥锁,但是在函数 size_fn() 期间或之后可能会发生上下文切换,从而导致不必要的等待(当它为空或满时)。但是由于生产者和消费者可以同时运行,因此整体性能似乎有所提高。
更新 C
上述代码存在异常。
所以这是另一种尝试:我首先对信号量进行逆向工程,以使用条件变量构建,如下所示:
class Semaphore:
def __init__(self, size):
self.size = size
self.curr_size = 0
self.mutex = Lock()
self.cv = Condition(self.mutex)
def acquire(self):
with self.mutex:
while self.curr_size == self.size:
self.cv.wait()
self.curr_size += 1
def release(self):
with self.mutex:
if self.curr_size == 0:
raise Exception("Releasing semaphore more times than acquired!")
self.curr_size-=1
self.cv.notify()
现在我可以使用这个想法两个生成 Q 实现(使用 semaphores 在这里看到)只使用条件变量:
尝试 4 类 Q: def init (self, size): self.buff = [None]*size self.end = 0 self.start = 0 self.size = size
self.start_lock = Lock()
self.end_lock = Lock()
self.open_mutex = Lock()
self.open_num = 0
self.open_cv = Condition(self.open_mutex)
self.closed_mutex = Lock()
self.closed_num = self.size
self.closed_cv = Condition(self.closed_mutex)
def put(self, val):
with self.open_mutex:
while self.open_num == self.size:
self.open_cv.wait()
self.open_num+=1
with self.end_lock:
self.buff[self.end] = val
self.end = (self.end+1)%self.size
with self.closed_mutex:
self.closed_num-=1
self.closed_cv.notify()
def get(self):
with self.closed_mutex:
while self.closed_num == self.size:
self.closed_cv.wait()
self.closed_num+=1
with self.start_lock:
val = self.buff[self.start]
self.start = (self.start+1)%self.size
with self.open_mutex:
self.open_num-=1
self.open_cv.notify()
return val
让我知道上述实现中是否存在问题。
解决方案
您可能想参考这个:使用条件变量的有界缓冲区示例,以及使用信号量的另一个示例: https ://codeistry.wordpress.com/2018/05/12/thread-safe-buffer-queue-python-code/
使用有界缓冲区的多个生产者消费者: https ://codeistry.wordpress.com/2018/05/13/ordered-producer-consumer-python-code/
推荐阅读
- azure-keyvault - Key Vault 中的访问控制和访问策略之间的区别
- python - {Python 3.8} setuptools 上的 Pip 命令错误
- node.js - Node JS蓝牙模块安装错误
- html - 如何让这个 flex-direction-column 布局在 Mobile Safari 中工作?
- azure - 有人能解释一下三角洲湖的“黄金”表是如何被推入数据仓库的吗?有没有办法从增量格式转换为数据框?
- php - 使用 OAUTH2 登录用户进行 Discord,然后加入 Guild
- python - 如何增强循环中数据框的构建?
- java - 导入Android“包”会导入资源吗?
- opencv - 在 OpenCV 中导入 AutoML 边缘训练模型
- java - 是否可以编写 AOP 注释?