首页 > 解决方案 > 带锁的同步 numpy 2D 数组计数器

问题描述

我想并行化一个在共享 numpy 2D 数组上运行的方法。

我的原始应用程序是研究的一部分并且非常复杂,但是,我创建了一个玩具示例,基本上复制了这些问题。

有一家服装店,出售不同尺寸和颜色的衣服。我将这家商店的库存表示为一个 2D 矩阵,其中表示和self.supply_arr[i][j]的衣服的总可用性。我有多个客户试图从商店购买。商店出售的衣服不应超过其库存。下面,我展示一个非平行的例子。size icolor j

import numpy as np


class ClothStore(object):
    def __init__(self, num_customers):
        self.supply_arr = np.random.randint(5, size=(2,2))
        self.sold_arr = np.zeros((2,2), dtype=int)
        self.num_customers = num_customers

    def make_purchase(self, size, color):
        left = self.supply_arr[size][color] - self.sold_arr[size][color]
        if left > 0:
            self.sold_arr[size][color] += 1
            return True
        else:
            return False

    def run(self):
        for customer in xrange(self.num_customers):
            size = np.random.randint(2)
            color = np.random.randint(2)

            purchase = self.make_purchase(size, color)

            if purchase:
                print "Customer: {} made successful purchase".format(customer)

if __name__ == "__main__":
    store = ClothStore(100)
    store.run()

    print "Supply Arr: {}".format(store.supply_arr)
    print "Sold Arr: {}".format(store.sold_arr)

我试图并行化该run(self)方法,使用pathos并表示我初始化self.supply_arrnp.empty((2,2), dtype=object)的每个元素的位置multiprocessing.Value。但是,我无法让它工作。任何帮助,将不胜感激。谢谢你。

标签: synchronizationpython-multiprocessingnumpy-ndarray

解决方案


我设法用迂回的方式解决了我自己的问题。这不是最优雅的方式,但它确实有效。我真的很感激帮助使它更优雅。

import numpy as np
from pathos.multiprocessing import ProcessingPool as Pool
from multiprocess import Manager


class ClothStoreNew(object):
    def __init__(self, num_customers):
        self.supply_arr = np.random.randint(5, size=(2, 2))
        self.num_customers = num_customers

    def make_purchase(self, arg):
        sold_dict = arg[0]
        i = arg[1]

        size = self.demand[i][1]
        color = self.demand[i][2]
        sold = sold_dict.get((size, color), 0)
        if self.supply_arr[size][color] > sold:
            sold_dict[(size, color)] = sold + 1

    def run(self):
        m = Manager()
        sold_dict = m.dict()
        pool = Pool(processes=100)
        self.demand = []
        for customer in xrange(self.num_customers):
            size = np.random.randint(1)
            color = np.random.randint(1)
            self.demand.append([customer, size, color])

        pool.map(self.make_purchase, ([sold_dict, i] for i in xrange(self.num_customers)))
        pool.close()
        pool.join()
        return dict(sold_dict)


if __name__ == "__main__":
    store = ClothStoreNew(20)
    sold_dict = store.run()
    print "Supply Arr: {}".format(store.supply_arr)
    print "Sold Dict: {}".format(sold_dict)

如您所见,我正在使用manager.dict()同步。我想使用manager.list(),但它似乎不起作用。此外,Manager对于每次更新使用锁定整个字典,理想的解决方案是一次锁定字典的每个单独的键(或 2D 矩阵的每个单独的单元格),以便在其他单元格上运行的进程没有等待。


推荐阅读