首页 > 解决方案 > 如何对大型 3d 图像堆栈使用多处理?Python

问题描述

我有一个 3d 图像堆栈(4000×2048×2048),我想在每个 2d 数组(2048×2048)中做一些操作,例如。高斯滤波,图像增强,resize img ...

import numpy as np
from tifffile import imread,imwrite
import multiprocessing as mp
import cv2

def gaussian_blur_2d(img):
    blur = cv2.GaussianBlur(img,(5,5),0) 
    return blur

file_path = "F:\\Ctest\\123.tif"
img = imread(file_path)
for i in range(0,img.shape[0]):
    img[i,:,:] = gaussian_blur_2d(img[i,:,:])


如何通过使用多处理来加速 for 循环?我的想法是将原始图像堆栈分成四个或八个部分,并pool.map用于拆分堆栈。但是我如何使用拆分处理结果来获得最终的完整堆栈。我不想编写拆分堆栈。这会增加额外的 IO 时间。当拆分堆栈太大时,pool.map根据我的经验,它会导致返回错误。

另一方面,我尝试将多维数组粘贴到mp.Array其中给我 TypeError: only size-1 数组可以转换为 Python 标量。

标签: pythonarraysnumpymultiprocessing

解决方案


正如我在评论中提到的,让所有数据在多个工作进程之间可访问是这里最大的挑战,因为使用多处理的关键租户之一是进程之间通常不共享内存。因此,我们必须明确告诉操作系统我们想要访问进程之间“共享”的一块内存,并使用该内存块创建我们的 numpy 数组。除此之外,它只是一个非常标准的多处理内务处理,在其他教程和示例中进行了很好的探索。

import numpy as np
from multiprocessing import Process, shared_memory, Queue, cpu_count
from queue import Empty
import cv2

class STOPFLAG: pass #a simple flag to tell the worker to stop

def worker_process(in_q, shm_name):
    shm = shared_memory.SharedMemory(name=shm_name) #create from the existing one made by the parent process
    img_stack = np.ndarray([4000, 2048, 2048], dtype="uint8", buffer=shm.buf) #attach a numpy array to the memory object
    while True: #until the worker runs out of work
        try:
            task = in_q.get(1) #don't wait forever on anything if you can help it.
        except Empty: #multiprocessing.Queue uses an exception template from the queue library
            print("assuming all tasks are done. worker exiting...") #assume waiting for a while means no more tasks (we shouldn't hit this, but it could prevent problems in the child if a crash happens elsewhere)
            break
        if isinstance(task, STOPFLAG):
            print("got stop flag. worker exiting...")
            break
        
        #process the image slice (no mutexes are needed because no two workers will ever get the same index to work on at the same time)
        img_stack[task] = cv2.GaussianBlur(img_stack[task],(5,5),0) 
        
    shm.close() #cleanup after yourself (close the local copy. This does not close the copy in the other processes)

if __name__ == "__main__": #this is needed with multiprocessing

    #create shared memory space where numpy will work from
    shm = shared_memory.SharedMemory(create=True, size=4000*2048*2048) #OS may have a hard time allocating this memory block because it's so big...
    #create the numpy array from the allocated memory
    img_stack = np.ndarray([4000, 2048, 2048], dtype="uint8", buffer=shm.buf)
    
    #Here is where you would load the image data onto the img_stack array. It will start out with whatever random data was previously in ram similar to numpy.empty.
    
    #create a queue to send workers tasks (image index to work on)
    in_q = Queue()
    
    #create a couple worker processes
    processes = [Process(target=worker_process, args = (in_q, shm.name)) for _ in range(cpu_count())]
    for p in processes:
        p.start()
    
    #fill up the task queue with image indices that need computation
    for i in range(4000):
        in_q.put(i)
        
    #send a stop signal for each worker
    for _ in processes:
        in_q.put(STOPFLAG())
        
    #wait for all children to finish
    for p in processes:
        p.join()
        
    #do something (save?) with the img_stack
    np.save("processed_images.npy", img_stack)
    
    shm.close() #cleanup
    shm.unlink() #unlink is called only once after the last instance has been "close()"d

推荐阅读