python - 使用多处理从队列中读取
问题描述
这是我使用 Python 多处理从队列中填充和读取的代码:
from multiprocessing import Lock, Process, Queue, Pool
import time
from random import randint
def add_to_queue(tasks_to_accomplish, name):
while True:
random_int = randint(0, 22)
print('name', name , "adding" , random_int)
tasks_to_accomplish.put(random_int)
time.sleep(2)
def read_from_queue(tasks_to_accomplish, name):
while True:
item = tasks_to_accomplish.get()
print('name' , name , item)
time.sleep(.01)
if __name__ == '__main__':
tasks_to_accomplish = Queue()
p = Process(target=add_to_queue, args=(tasks_to_accomplish, "p"))
p.start()
p2 = Process(target=read_from_queue, args=(tasks_to_accomplish, "p2"))
p2.start()
p3 = Process(target=read_from_queue, args=(tasks_to_accomplish, "p3"))
p3.start()
p.join()
p2.join()
p3.join()
代码将无限执行,这里是部分输出:
name p adding 3
name p2 3
name p adding 4
name p3 4
name p adding 0
name p2 0
name p adding 22
name p3 22
name p adding 2
name p2 2
name p adding 13
name p3 13
name p adding 0
name p2 0
name p adding 14
name p3 14
name p adding 20
name p2 20
name p adding 4
name p3 4
从队列中读取所用时间为 0.01 秒:time.sleep(.01)
。但是 p2 和 p3 进程似乎没有在 0.01 秒内读取线程,因为很明显它们阻塞了超过 0.01 秒。我是否正确实现了进程线程以从队列中读取?
解决方案
正如 Daniel 指出的那样,Queue.get()默认情况下将阻塞直到数据可用。
您可以使用它q.get(block=True)
来更改它,尽管这会引发异常:
name p adding 12
name p2 12
Process Process-6:
Traceback (most recent call last):
File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "<ipython-input-2-4e6d57c64980>", line 15, in read_from_queue
item = tasks_to_accomplish.get(block=False)
File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/queues.py", line 110, in get
raise Empty
_queue.Empty
Process Process-5:
Traceback (most recent call last):
File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "<ipython-input-2-4e6d57c64980>", line 15, in read_from_queue
item = tasks_to_accomplish.get(block=False)
File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/queues.py", line 110, in get
raise Empty
_queue.Empty
name p adding 2
name p adding 12
name p adding 14
name p adding 21
name p adding 9
name p adding 13
您需要:
def read_from_queue(tasks_to_accomplish, name):
while True:
try:
item = tasks_to_accomplish.get(block=False)
except:
print('no data for', name)
else:
print('name' , name , item)
time.sleep(.01)
要得到:
name p adding 0
name p2 0
no data for p3
no data for p3
no data for p2
no data for p2
no data for p3
no data for p2
no data for p3
# about 350 more entries like this
name p adding 5
no data for p2
name p3 5
no data for p2
no data for p3
no data for p3
no data for p2
no data for p3
# ...
除非您需要在两次读取之间做一些工作,否则我会说是的,您已经正确实现了读取过程(并且您可以sleep
在读取时安全地删除调用)。
推荐阅读
- r - R:跨excel文件的条件格式
- amazon-web-services - EBS 卷附加到正在运行的 EC2 Windows 实例
- string - 具有同时和顺序替换的 sed
- node.js - 如何使用 node ssh2 启动用户交互式 ssh 会话?
- scala - 如何在 AWS 上执行一个简单的 Jar 以对 S3 存储桶执行一些操作
- visual-studio-code - 使用大量 CPU 的 Visual Studio Code
- ios - IOS/Objective-C:从共享实例更改视图控制器中的属性?
- regex - 使用正则表达式,选择前导斜杠但仅当字符串为 2 或更多时?
- android - 带有 GridLayoutManager 的 RecyclerView 中的项目的高度不是 wrap_content
- python - 如何将字符串添加到 Python 3 中文本文件中读取的字符串?