首页 > 解决方案 > 对未来结果的管理不善减慢了业绩

问题描述

我正在寻找有关如何解决下面描述的瓶颈的任何建议。

在一个 dask 分布式基础架构中,我映射了一些未来,并在它们准备好时获得结果。一旦检索到,我必须调用一个耗时、阻塞的“pandas”函数,不幸的是,这个函数无法避免。最好的办法是让我创建另一个进程,从 for 循环中分离出来,它能够摄取结果流。对于示例中不存在的其他约束,输出无法序列化并发送给工作人员,必须在主服务器上处理。

这是一个小样机。抓住这个想法,不要过多关注代码的细节。

class pxldrl(object):
    def __init__(self, df):
        self.table = df

def simulation(list_param):
    time.sleep(random.random())
    val = sum(list_param)/4
    if val < 0.5:
        result = {'param_e': val}
    else:
        result = {'param_f': val}
    return pxldrl(result)

def costly_function(result, output):
    time.sleep(1)
    # blocking pandas function 
    output = output.append(result.table, sort=False, ignore_index=True)

    return output

def main():
    client = Client(n_workers=4, threads_per_worker=1)

    output = pd.DataFrame(columns=['param_e', 'param_f'])

    input = pd.DataFrame(np.random.random(size=(100, 4)),
                                columns=['param_a', 'param_b', 'param_c', 'param_d'])

    for i in range(2):

        futures = client.map(simulation, input.values)

        for future, result in as_completed(futures, with_results=True):
            output = costly_function(result, output)

标签: daskdask-distributed

解决方案


听起来您想costly_function在单独的线程中运行。也许您可以使用threadingorconcurrent.futures模块在单独的线程上运行整个例程?

如果您想变得花哨,您甚至可以再次使用 Dask 并创建第二个在此过程中运行的客户端:

local_client = Client(processes=False)

并使用它。(尽管您必须小心在客户之间混合期货,这是行不通的)


推荐阅读