首页 > 解决方案 > 附加到同一个锁的两个条件变量,python2 和 python3 中的不同行为

问题描述

我正在尝试用 python 编写经典的生产者-消费者程序。这是我引用的 c 代码: http ://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture16.html https://web.stanford.edu/~ouster/cgi-bin/cs140- spring14/lecture.php?topic=锁

之后 我在 lubuntu 18.04 上运行这个程序pip install coloredpip3 install colored当作为“python3 producer-consumer.py”运行(即使用python 3.6.7运行)时,程序在几次迭代后挂起

"queue is empty, stop consuming"

"queue is full, stop producing"

注意: ctrl-c 不会杀死程序。您需要按 ctrl-z 然后 kill -9 %1 来杀死它。

奇怪的是:当作为“python producer-consumer.py”运行(即使用python 2.7.15rc1 运行)时,它几乎按预期运行。但是在运行足够长的时间后,它会在以下位置引发 IndexError 异常

queue.append(item)

item = queue.pop(0)

在此之前,它按预期运行了好几分钟:3 个不同颜色的生产者和 3 个消费者在同一个小容量队列上工作,经常碰到空队列和满队列的情况。

我怀疑无论我的程序是否正确,python2 和 python3 中的不同行为似乎表明条件变量的 python3(也可能是 python2)实现中存在错误?或者,对于某些有缺陷的程序,这种差异实际上是预期的吗?提前致谢。

from threading import Thread, Lock, Condition
import time
from random import random, randint
import colored
from colored import stylize

queue = []
CAPACITY = 3

qlock = Lock()
item_ok = Condition(qlock)
space_ok = Condition(qlock)

class ProducerThread(Thread):
    def run(self):
        global queue
        mycolor = self.name
        while True:
            qlock.acquire()
            if len(queue) >= CAPACITY:
                print(stylize('queue is full, stop producing', colored.fg(mycolor)))
                while space_ok.wait():
                    pass
                print(stylize('space available again, start producing', colored.fg(mycolor)))
            item = chr(ord('A')+randint(0,25))
            print(stylize('['+' '.join(queue)+'] <= '+item, colored.fg( mycolor)))
            queue.append(item)
            item_ok.notify()
            qlock.release()
            time.sleep((random()+0.2)/1.2)


class ConsumerThread(Thread):
    def run(self):
        global queue
        mycolor = self.name
        while True:
            qlock.acquire()
            if not queue:
                print(stylize('queue is empty, stop consuming', colored.fg(mycolor)))
                while item_ok.wait():
                    pass
                print(stylize('food is available, start consuming', colored.fg(mycolor)))
            item = queue.pop(0)
            print(stylize(item+' <= ['+' '.join(queue)+']', colored.fg( mycolor)))
            space_ok.notify()
            qlock.release()
            time.sleep((random()+0.2)/1.2)

ProducerThread(name='red').start()
ProducerThread(name='green').start()
ProducerThread(name='blue').start()
ConsumerThread(name='cyan').start()
ConsumerThread(name='magenta').start()
ConsumerThread(name='yellow').start()

运行 python2 几分钟后

标签: pythonpython-3.xpython-multithreading

解决方案


主要问题是您的代码是在通知线程您没有检查列表是否为空/满。在以下情况下,这可能是一个问题:

c1并且c2是消费者线程,p1是生产者线程。队列一开始是空的。c1正在time.sleep...等待c2通知(在while item_ok.wait():.

  1. p1将项目添加到队列并调用item_ok.notify()
  2. c1完成等待并获取锁
  3. c2收到通知并尝试获取锁
  4. c1消费队列中的项目并释放锁
  5. c2获取锁并尝试从空队列中弹出

解决方案

.wait()而不是在 while 条件中调用(这是无意义的,因为它总是None在 Python 2 上返回并且总是True在 Python 3.2+ 上,请参见此处),而是.wait()在 while 循环体中调用并放置条件是否队列未满/空在while循环条件下:

while not queue:
    print('queue is empty, stop consuming')
    item_ok.wait()
    print('trying again')

通过使用这种方法(在上面链接的文档中也使用过),线程在唤醒并获取锁后检查队列是否仍然不空/满。如果条件不再满足(因为在其间执行了另一个线程),则线程再次等待条件。

顺便说一句,上面描述的python 2和3之间的区别也是你的程序在两个版本上表现不同的原因。这是记录在案的行为,而不是实现中的错误。

生产者和消费者线程的固定代码(在过去 30 分钟内在我的机器上运行良好)如下所示(我删除了颜色,因为我不想安装包):

class ProducerThread(Thread):
    def run(self):
        global queue
        while True:
            qlock.acquire()
            while len(queue) >= CAPACITY:
                print('queue is full, stop producing')
                space_ok.wait()
                print('trying again')
            item = chr(ord('A')+randint(0,25))
            print('['+' '.join(queue)+'] <= '+item)
            queue.append(item)
            item_ok.notify()
            qlock.release()
            time.sleep((random()+0.2)/1.2)


class ConsumerThread(Thread):
    def run(self):
        global queue
        while True:
            qlock.acquire()
            while not queue:
                print('queue is empty, stop consuming')
                item_ok.wait()
                print('trying again')
            item = queue.pop(0)
            print(item+' <= ['+' '.join(queue)+']')
            space_ok.notify()
            qlock.release()
            time.sleep((random()+0.2)/1.2)

奖金

您提到无法使用Ctrl-C(KeyboardInterrupt) 退出程序。要解决此问题,您可以使线程成为“守护进程”,这意味着它们会在主线程结束后立即退出。使用上面的代码,Ctrl-C可以很好地结束程序:

ProducerThread(name='red', daemon=True).start()
ProducerThread(name='green', daemon=True).start()
ProducerThread(name='blue', daemon=True).start()
ConsumerThread(name='cyan', daemon=True).start()
ConsumerThread(name='magenta', daemon=True).start()
ConsumerThread(name='yellow', daemon=True).start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Exiting")

这能解决你的问题吗?请在下方发表评论。


推荐阅读