python - 为什么python中基于多线程的pipeline可以提高效率
问题描述
尽管 Python 支持多线程执行,但它GIL
会导致其中一个线程一次向前推进。然而,在我阅读了《Effective Python》之后,我在这本书中实现了管道示例。代码如下,整个过程分为3个阶段,分别是下载、调整大小和上传。
from threading import Thread
from queue import Queue
import time
# define operation functions for 3 stages: download, resize and upload
def download(item):
print("downloading {}".format(item))
time.sleep(2)
return item
def resize(item):
print("resizing {}".format(item))
time.sleep(3)
return item
def upload(item):
print("uploading {}".format(item))
time.sleep(5)
return item
# subclass of Thread
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super(StoppableWorker, self).__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
# subclass of Queue
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return
yield item
finally:
self.task_done()
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
output_queue = ClosableQueue()
threads = [StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, output_queue)]
for t in threads:
t.start()
st = time.time()
for i in range(10):
download_queue.put(i)
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print("It took {}".format(time.time() - st))
print(output_queue.qsize(), 'items finished')
根据输出时间,我发现N个任务花费的时间是10 + (N-1)*5
秒(10 = 2(下载阶段)+ 3(调整大小阶段)+ 5(上传阶段))。看起来这3个线程可以同时工作,这与上面的描述相矛盾的是,一次只有一个线程可以向前推进
解决方案
- I/O 任务中的线程:
线程是改变游戏规则的,因为许多与网络/数据 I/O 相关的脚本大部分时间都在等待来自远程源的数据。因为下载可能没有链接(即,抓取单独的网站),处理器可以从不同的数据源并行下载并在最后组合结果。对于 CPU 密集型进程,使用 threading 模块几乎没有什么好处。
推荐阅读
- python - 无法安装我自己的 PyPi 包:无法满足要求
- javascript - JS/HTML Date picker enforce rule to ensure input is today
- r - 如果行中的变量在 R 中匹配或不匹配,如何将 1 和 0 分配给列
- reactjs - 我正在尝试使用 redux 实现撤消/重做——为什么当我更新原始对象时我的对象克隆会更新?
- c - 矩阵乘法:内存分配错误
- python - ValueError:未来属于与指定为循环参数的循环不同的循环
- c - 编写一个程序,计算并显示用户输入的正整数 n 的阶乘
- python-3.x - 如何对列中的字符串值进行分类以获得相关性?(Python)
- php - 使用 model::count laravel 计算行数
- environment-variables - Julia Jump Xpress.jl XPRESSDIR 安装环境变量