dask - 对未来结果的管理不善减慢了业绩
问题描述
我正在寻找有关如何解决下面描述的瓶颈的任何建议。
在一个 dask 分布式基础架构中,我映射了一些未来,并在它们准备好时获得结果。一旦检索到,我必须调用一个耗时、阻塞的“pandas”函数,不幸的是,这个函数无法避免。最好的办法是让我创建另一个进程,从 for 循环中分离出来,它能够摄取结果流。对于示例中不存在的其他约束,输出无法序列化并发送给工作人员,必须在主服务器上处理。
这是一个小样机。抓住这个想法,不要过多关注代码的细节。
class pxldrl(object):
def __init__(self, df):
self.table = df
def simulation(list_param):
time.sleep(random.random())
val = sum(list_param)/4
if val < 0.5:
result = {'param_e': val}
else:
result = {'param_f': val}
return pxldrl(result)
def costly_function(result, output):
time.sleep(1)
# blocking pandas function
output = output.append(result.table, sort=False, ignore_index=True)
return output
def main():
client = Client(n_workers=4, threads_per_worker=1)
output = pd.DataFrame(columns=['param_e', 'param_f'])
input = pd.DataFrame(np.random.random(size=(100, 4)),
columns=['param_a', 'param_b', 'param_c', 'param_d'])
for i in range(2):
futures = client.map(simulation, input.values)
for future, result in as_completed(futures, with_results=True):
output = costly_function(result, output)
解决方案
听起来您想costly_function
在单独的线程中运行。也许您可以使用threading
orconcurrent.futures
模块在单独的线程上运行整个例程?
如果您想变得花哨,您甚至可以再次使用 Dask 并创建第二个在此过程中运行的客户端:
local_client = Client(processes=False)
并使用它。(尽管您必须小心在客户之间混合期货,这是行不通的)
推荐阅读
- r - 如何通过使用多个字段来计算带有组的值
- firebase - 如何管理具有可变时间间隔的重复出现的 Cloudfunctions?
- arm - Linux 内核模块作弊 - Qemu Baremetal Xilinx Zynq A9
- c++ - 提升共享内存映射地址
- python - Python 在一行中初始化 np.zeros() 并在 index(x) 中设置值
- html - 如何在Angular 8中的选择中将值默认为“选择问题”?
- excel - 使用 Excel 宏在一张表中创建具有特定范围的 PDF
- python-3.x - 从测试内部访问 pytest capsys
- php - 如何使用我的 react native apk 与不同的 wifi ip?
- python - 对于这个问题,我被困在一行返回语句中