首页 > 解决方案 > 使用 Python 更新并行计算的参数

问题描述

我有一些 Python 代码可以执行下面列出的操作。该calc_result()函数根据输入参数生成结果。在每一步,这些输入参数都会更新以计算一组新的结果。重复此过程直到最后一步。下面给出了这个过程的示例代码。

  1. 定义初始参数
  2. 根据初始参数计算结果
  3. 根据结果​​更新参数
  4. 根据更新的参数计算结果
  5. 重复 3 和 4 直到最后一步

工作示例

import numpy as np
import random
import time

def calc_params(res: list) -> list:
    time.sleep(random.random())
    return [r * 1.1 for r in res]

def calc_result(param: float):
    time.sleep(random.random())
    return param + 1

def main():
    tic = time.perf_counter()

    nsteps = 10
    nmodels = 4
    init_params = [5, 4.5, 8, 2]

    steps = list(range(nsteps))

    params = np.zeros((nsteps, nmodels))
    params[0] = init_params

    results = np.zeros((nsteps, nmodels))

    for step in steps:

        step_results = []

        for p in params[step]:
            out = calc_result(p)
            step_results.append(out)

        results[step] = step_results

        if step < nsteps - 1:
            params[step + 1] = calc_params(step_results)

    toc = time.perf_counter()

    print(f'\nElapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{results}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)
    main()

此示例打印以下输出:

Elapsed time 22.75 s

Parameters
[[ 5.    4.5   8.    2.  ]
 [ 6.6   6.05  9.9   3.3 ]
 [ 8.36  7.76 11.99  4.73]
 [10.3   9.63 14.29  6.3 ]
 [12.43 11.69 16.82  8.03]
 [14.77 13.96 19.6   9.94]
 [17.34 16.46 22.66 12.03]
 [20.18 19.21 26.03 14.33]
 [23.3  22.23 29.73 16.87]
 [26.73 25.55 33.8  19.65]]

Results
[[ 6.    5.5   9.    3.  ]
 [ 7.6   7.05 10.9   4.3 ]
 [ 9.36  8.76 12.99  5.73]
 [11.3  10.63 15.29  7.3 ]
 [13.43 12.69 17.82  9.03]
 [15.77 14.96 20.6  10.94]
 [18.34 17.46 23.66 13.03]
 [21.18 20.21 27.03 15.33]
 [24.3  23.23 30.73 17.87]
 [27.73 26.55 34.8  20.65]]

示例

我尝试使用 Dask 并行化代码,如下所示。

import numpy as np
import random
import time
from dask.distributed import Client, get_client, secede, rejoin

def calc_params(res: list) -> list:
    time.sleep(random.random())
    return [r * 1.1 for r in res]

def calc_result(param: float):
    time.sleep(random.random())
    return param + 1

def solve_step(step: int, nsteps: int, params: np.ndarray) -> list:
    client = get_client()

    futures = client.map(calc_result, params[step], priority=10)
    secede()

    step_results = client.gather(futures)
    rejoin()

    if step < nsteps - 1:
        params[step + 1] = calc_params(step_results)

    return step_results

def main():
    tic = time.perf_counter()

    nsteps = 10
    nmodels = 4
    init_params = [5, 4.5, 8, 2]

    params = np.zeros((nsteps, nmodels))
    params[0] = init_params

    steps = list(range(nsteps))

    futures = client.map(solve_step, steps, pure=False, nsteps=nsteps, params=params)
    results = client.gather(futures)
    results = np.array(results)

    toc = time.perf_counter()

    print(f'\nElapsed time {toc - tic:.2f} s\n')
    print(f'Parameters\n{params}\n')
    print(f'Results\n{results}')

if __name__ == '__main__':
    np.set_printoptions(precision=2)

    client = Client(n_workers=8)
    print('\n' + client.dashboard_link)

    main()

    client.close()

Dask 示例的输出:

Elapsed time 1.85 s

Parameters
[[5.  4.5 8.  2. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]
 [0.  0.  0.  0. ]]

Results
[[6.  5.5 9.  3. ]
 [1.  1.  1.  1. ]
 [1.  1.  1.  1. ]
 [1.  1.  1.  1. ]
 [1.  1.  1.  1. ]
 [1.  1.  1.  1. ]
 [1.  1.  1.  1. ]
 [1.  1.  1.  1. ]
 [1.  1.  1.  1. ]
 [1.  1.  1.  1. ]]

Dask 示例不更新参数值,因此计算结果不正确。有没有办法在并行计算结果的同时更新参数数组?我尝试遵循 Dask 示例处理进化工作流中的方法,但它似乎不适用于这个问题。Dask 也有Actors,但根据文档,它是一个实验性功能,并且 Dask 仪表板在使用 Actors 时不显示信息。另一个用于扩展 Python 的包是Ray,但我没有使用它的经验。

标签: pythonnumpydaskdask-distributedray

解决方案


鉴于您的问题描述,迭代过程似乎对算法很重要。所以我不知道你应该如何并行化它。Dask 无法神奇地知道后面步骤的结果是什么。该示例中的“不断发展的工作流程”是指触发更多作业的工作流程 - 这与您的情况不同,其中每个工作流程阶段都取决于前一个。

考虑一个非常简单的工作流程示例:

result = 0
for i in range(10):
    result += 1

您可以将其重构为:

def inc(val):
    return val + 1

result = 0
for i in range(10):
    result = inc(val)

但你不能做的是

 # results in a list of 10 futures, each of which results in the number 2
result = client.map(inc, val=result)

推荐阅读