python - 使用 Python 更新并行计算的参数
问题描述
我有一些 Python 代码可以执行下面列出的操作。该calc_result()
函数根据输入参数生成结果。在每一步,这些输入参数都会更新以计算一组新的结果。重复此过程直到最后一步。下面给出了这个过程的示例代码。
- 定义初始参数
- 根据初始参数计算结果
- 根据结果更新参数
- 根据更新的参数计算结果
- 重复 3 和 4 直到最后一步
工作示例
import numpy as np
import random
import time
def calc_params(res: list) -> list:
time.sleep(random.random())
return [r * 1.1 for r in res]
def calc_result(param: float):
time.sleep(random.random())
return param + 1
def main():
tic = time.perf_counter()
nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]
steps = list(range(nsteps))
params = np.zeros((nsteps, nmodels))
params[0] = init_params
results = np.zeros((nsteps, nmodels))
for step in steps:
step_results = []
for p in params[step]:
out = calc_result(p)
step_results.append(out)
results[step] = step_results
if step < nsteps - 1:
params[step + 1] = calc_params(step_results)
toc = time.perf_counter()
print(f'\nElapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{results}')
if __name__ == '__main__':
np.set_printoptions(precision=2)
main()
此示例打印以下输出:
Elapsed time 22.75 s
Parameters
[[ 5. 4.5 8. 2. ]
[ 6.6 6.05 9.9 3.3 ]
[ 8.36 7.76 11.99 4.73]
[10.3 9.63 14.29 6.3 ]
[12.43 11.69 16.82 8.03]
[14.77 13.96 19.6 9.94]
[17.34 16.46 22.66 12.03]
[20.18 19.21 26.03 14.33]
[23.3 22.23 29.73 16.87]
[26.73 25.55 33.8 19.65]]
Results
[[ 6. 5.5 9. 3. ]
[ 7.6 7.05 10.9 4.3 ]
[ 9.36 8.76 12.99 5.73]
[11.3 10.63 15.29 7.3 ]
[13.43 12.69 17.82 9.03]
[15.77 14.96 20.6 10.94]
[18.34 17.46 23.66 13.03]
[21.18 20.21 27.03 15.33]
[24.3 23.23 30.73 17.87]
[27.73 26.55 34.8 20.65]]
示例
我尝试使用 Dask 并行化代码,如下所示。
import numpy as np
import random
import time
from dask.distributed import Client, get_client, secede, rejoin
def calc_params(res: list) -> list:
time.sleep(random.random())
return [r * 1.1 for r in res]
def calc_result(param: float):
time.sleep(random.random())
return param + 1
def solve_step(step: int, nsteps: int, params: np.ndarray) -> list:
client = get_client()
futures = client.map(calc_result, params[step], priority=10)
secede()
step_results = client.gather(futures)
rejoin()
if step < nsteps - 1:
params[step + 1] = calc_params(step_results)
return step_results
def main():
tic = time.perf_counter()
nsteps = 10
nmodels = 4
init_params = [5, 4.5, 8, 2]
params = np.zeros((nsteps, nmodels))
params[0] = init_params
steps = list(range(nsteps))
futures = client.map(solve_step, steps, pure=False, nsteps=nsteps, params=params)
results = client.gather(futures)
results = np.array(results)
toc = time.perf_counter()
print(f'\nElapsed time {toc - tic:.2f} s\n')
print(f'Parameters\n{params}\n')
print(f'Results\n{results}')
if __name__ == '__main__':
np.set_printoptions(precision=2)
client = Client(n_workers=8)
print('\n' + client.dashboard_link)
main()
client.close()
Dask 示例的输出:
Elapsed time 1.85 s
Parameters
[[5. 4.5 8. 2. ]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]
[0. 0. 0. 0. ]]
Results
[[6. 5.5 9. 3. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]
[1. 1. 1. 1. ]]
Dask 示例不更新参数值,因此计算结果不正确。有没有办法在并行计算结果的同时更新参数数组?我尝试遵循 Dask 示例处理进化工作流中的方法,但它似乎不适用于这个问题。Dask 也有Actors,但根据文档,它是一个实验性功能,并且 Dask 仪表板在使用 Actors 时不显示信息。另一个用于扩展 Python 的包是Ray,但我没有使用它的经验。
解决方案
鉴于您的问题描述,迭代过程似乎对算法很重要。所以我不知道你应该如何并行化它。Dask 无法神奇地知道后面步骤的结果是什么。该示例中的“不断发展的工作流程”是指触发更多作业的工作流程 - 这与您的情况不同,其中每个工作流程阶段都取决于前一个。
考虑一个非常简单的工作流程示例:
result = 0
for i in range(10):
result += 1
您可以将其重构为:
def inc(val):
return val + 1
result = 0
for i in range(10):
result = inc(val)
但你不能做的是
# results in a list of 10 futures, each of which results in the number 2
result = client.map(inc, val=result)
推荐阅读
- java - 如何覆盖数组?
- xml - 从元素中删除前缀
- javascript - javaScriptExecutor 返回的 Null
- xamarin - 使用带有 Xamarin Forms 4.0 Shell 的 IoC 容器来解析页面实例
- android - 如何在另一个 Activity 中显示数组列表?
- ruby - Jekyll 错误:nil:NilClass 的未定义方法祖先
- javascript - 网络抓取时的请求问题和欢呼
- php - 修改 request() 数组返回错误 间接修改重载属性
- angularjs - 具有隔离范围的 AngularJS 指令
- java - Apache POI 在编辑后保留预定义的宏