python - 如何在python中正确实现生产者消费者
问题描述
我在生产者消费者模式中有两个线程。当消费者接收到数据时,它会调用一个耗时的函数expensive()
,然后进入一个for
循环。
但是如果消费者在工作时有新数据到达,它应该中止当前工作,(退出循环)并从新数据开始。
我尝试使用 queue.Queue ,如下所示:
q = queue.Queue()
def producer():
while True:
...
q.put(d)
def consumer():
while True:
d = q.get()
expensive(d)
for i in range(10000):
...
if not q.empty():
break
但是这段代码的问题是,如果生产者放数据的速度太快,队列有很多项目,消费者将执行expensive(d)
调用加上一个循环迭代,然后对每个项目中止,这很耗时。代码应该可以工作,但没有优化。
解决方案
如果不修改expensive
一个解决方案中的代码,可以将其作为一个单独的进程运行,这将使您能够提前终止它。但是,由于没有提及expensive
运行多长时间,这可能会或可能不会更节省时间。
import multiprocessing as mp
q = queue.Queue()
def producer():
while True:
...
q.put(d)
def consumer():
while True:
d = q.get()
exp = mp.Thread(target=expensive, args=(d,))
for i in range(10000):
...
if not q.empty():
exp.terminate() # or exp.kill()
break
推荐阅读
- mysql - 尝试插入值时导致错误的外键
- null - 带有细化列的表未显示缺失值的“Null”(Google 数据工作室)
- java - 具有 AutoValue 生成类的外部 jar 的问题
- palantir-foundry - 如何防止在 Slate 文档加载时自动运行查询?
- android - 有没有办法通过使用cordova android@6.2.3来定位android-29?
- visual-studio - 从模板打开项目时如何保持相对路径
- csv - 我们如何在 Neo4j 中加载一个包含 1000 个部分文件的 s3 目录
- assembly - 为什么调用函数时不保留值 %r12
- css - 如何在 Bootstrap 4 SVG 日期图标中自定义数字?
- python-3.x - 尝试使用 ssh 在远程 Windows 主机上运行测试时出现 Pytest x-dist INTERNALERROR