首页 > 解决方案 > 使用 Python 条件变量的有界大小队列的生产者消费者

问题描述

我试图了解如何实现一个有界大小的队列以供多个生产者和消费者使用。我有这个代码:

尝试1:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.curr_size = 0
        self.open = Condition()
        self.closed = Condition()
        self.closed.acquire()

    def put(self, val):
        self.open.acquire()
        while self.curr_size == self.size:
            self.open.wait()
        self.buff[self.end] = val
        self.end = (self.end+1)%self.size
        self.curr_size+=1
        self.closed.notify()
        self.closed.release()

    def get(self):
        self.closed.acquire()
        while self.curr_size == 0:
            self.closed.wait()
        val = self.buff[self.start]
        self.start = (self.start+1)%self.size
        self.curr_size-=1
        self.open.notify()
        self.open.release()
        return val

我可以进一步简化这一点(例如,只使用一个条件变量或互斥锁)?

更新 A:上面的代码示例只允许将一项放入队列中,直到调用 get 才允许再放入,浪费了缓冲区的其余部分。这是尝试修复它的代码的更新:

尝试 2

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.curr_size = 0
        self.mutex = Lock()
        self.open = Condition(self.mutex)
        self.closed = Condition(self.mutex)

    def put(self, val):
        self.mutex.acquire()
        while self.curr_size == self.size:
            self.open.wait()
        self.buff[self.end] = val
        self.end = (self.end+1)%self.size
        self.curr_size+=1
        self.closed.notify()
        self.mutex.release()

    def get(self):
        self.mutex.acquire()
        while self.curr_size == 0:
            self.closed.wait()
        val = self.buff[self.start]
        self.start = (self.start+1)%self.size
        self.curr_size-=1
        self.open.notify()
        self.mutex.release()
        return val

更新B:这里生产者阻止消费者,反之亦然,有没有一种方法可以像这里使用信号量一样并发?

尝试 3:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 1
        self.start = 0
        self.size = size
        self.start_lock = Lock()
        self.end_lock = Lock()
        self.open = Condition(self.end_lock)
        self.closed = Condition(self.start_lock)
        self.start_lock.acquire()

    def size_fn(self):
        return self.end + self.size - self.start if self.end <= self.start else self.end - self.start

    def put(self, val):
        with self.end_lock:
            while size_fn() == self.size:
                self.open.wait()
            self.buff[self.end-1] = val
            self.end = (self.end+1)%self.size
            self.closed.notify()

    def get(self):
        with self.start_lock:
            while size_fn() == 0:
                self.closed.wait()
            val = self.buff[(self.start+1)%self.size]
            self.start = (self.start+1)%self.size
            self.open.notify()
        return val

在这里,生产者和消费者使用不同的互斥锁,但是在函数 size_fn() 期间或之后可能会发生上下文切换,从而导致不必要的等待(当它为空或满时)。但是由于生产者和消费者可以同时运行,因此整体性能似乎有所提高。

更新 C

上述代码存在异常。

所以这是另一种尝试:我首先对信号量进行逆向工程,以使用条件变量构建,如下所示:

class Semaphore:
    def __init__(self, size):
        self.size = size
        self.curr_size = 0
        self.mutex = Lock()
        self.cv = Condition(self.mutex)

    def acquire(self):
        with self.mutex:
            while self.curr_size == self.size:
                self.cv.wait()
            self.curr_size += 1         

    def release(self):
        with self.mutex:
            if self.curr_size == 0:
                raise Exception("Releasing semaphore more times than acquired!")
            self.curr_size-=1
            self.cv.notify()

现在我可以使用这个想法两个生成 Q 实现(使用 semaphores 在这里看到)只使用条件变量:

尝试 4 类 Q: def init (self, size): self.buff = [None]*size self.end = 0 self.start = 0 self.size = size

        self.start_lock = Lock()
        self.end_lock = Lock()

        self.open_mutex = Lock()
        self.open_num = 0
        self.open_cv = Condition(self.open_mutex)

        self.closed_mutex = Lock()
        self.closed_num = self.size
        self.closed_cv = Condition(self.closed_mutex)

    def put(self, val):

            with self.open_mutex:
                while self.open_num == self.size:
                    self.open_cv.wait()
                self.open_num+=1

            with self.end_lock:
                self.buff[self.end] = val
                self.end = (self.end+1)%self.size

            with self.closed_mutex:
                self.closed_num-=1
                self.closed_cv.notify()

    def get(self):
            with self.closed_mutex:
                while self.closed_num == self.size:
                    self.closed_cv.wait()
                self.closed_num+=1

            with self.start_lock:
                val = self.buff[self.start]
                self.start = (self.start+1)%self.size

            with self.open_mutex:
                self.open_num-=1
                self.open_cv.notify()
        return val

让我知道上述实现中是否存在问题。

标签: pythonqueuepython-multithreadingproducer-consumercondition-variable

解决方案


您可能想参考这个:使用条件变量的有界缓冲区示例,以及使用信号量的另一个示例: https ://codeistry.wordpress.com/2018/05/12/thread-safe-buffer-queue-python-code/

使用有界缓冲区的多个生产者消费者: https ://codeistry.wordpress.com/2018/05/13/ordered-producer-consumer-python-code/


推荐阅读