首页 > 解决方案 > 如何在python中正确实现生产者消费者

问题描述

我在生产者消费者模式中有两个线程。当消费者接收到数据时,它会调用一个耗时的函数expensive(),然后进入一个for循环。

但是如果消费者在工作时有新数据到达,它应该中止当前工作,(退出循环)并从新数据开始。

我尝试使用 queue.Queue ,如下所示:

q = queue.Queue()

def producer():
    while True:
        ...
        q.put(d)
      
def consumer():
    while True:
        d = q.get()
        expensive(d)
        for i in range(10000):
            ...
            if not q.empty():
                break
    

但是这段代码的问题是,如果生产者放数据的速度太快,队列有很多项目,消费者将执行expensive(d)调用加上一个循环迭代,然后对每个项目中止,这很耗时。代码应该可以工作,但没有优化。

标签: pythonmultithreadingconcurrencyproducer-consumer

解决方案


如果不修改expensive一个解决方案中的代码,可以将其作为一个单独的进程运行,这将使您能够提前终止它。但是,由于没有提及expensive运行多长时间,这可能会或可能不会更节省时间。

import multiprocessing as mp

q = queue.Queue()


def producer():
    while True:
        ...
        q.put(d)
  
def consumer():
    while True:
        d = q.get()
        exp = mp.Thread(target=expensive, args=(d,))
        for i in range(10000):
            ...
            if not q.empty():
                exp.terminate() # or exp.kill()
                break

推荐阅读