首页 > 解决方案 > 为什么python中基于多线程的pipeline可以提高效率

问题描述

尽管 Python 支持多线程执行,但它GIL会导致其中一个线程一次向前推进。然而,在我阅读了《Effective Python》之后,我在这本书中实现了管道示例。代码如下,整个过程分为3个阶段,分别是下载、调整大小和上传。

from threading import Thread
from queue import Queue
import time

# define operation functions for 3 stages: download, resize and upload
def download(item):
    print("downloading {}".format(item))
    time.sleep(2)
    return item
    
def resize(item):
    print("resizing {}".format(item))
    time.sleep(3)
    return item
    
def upload(item):
    print("uploading {}".format(item))
    time.sleep(5)
    return item

# subclass of Thread
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super(StoppableWorker, self).__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

# subclass of Queue
class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
    
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
output_queue = ClosableQueue()
threads = [StoppableWorker(download, download_queue, resize_queue),
           StoppableWorker(resize, resize_queue, upload_queue),
           StoppableWorker(upload, upload_queue, output_queue)]

for t in threads:
    t.start()

st = time.time()
for i in range(10):
    download_queue.put(i)
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print("It took {}".format(time.time() - st))
print(output_queue.qsize(), 'items finished')

根据输出时间,我发现N个任务花费的时间是10 + (N-1)*5秒(10 = 2(下载阶段)+ 3(调整大小阶段)+ 5(上传阶段))。看起来这3个线程可以同时工作,这与上面的描述相矛盾的是,一次只有一个线程可以向前推进

标签: pythonmultithreadingqueuepipeline

解决方案


  • I/O 任务中的线程:

线程是改变游戏规则的,因为许多与网络/数据 I/O 相关的脚本大部分时间都在等待来自远程源的数据。因为下载可能没有链接(即,抓取单独的网站),处理器可以从不同的数据源并行下载并在最后组合结果。对于 CPU 密集型进程,使用 threading 模块几乎没有什么好处。


推荐阅读