首页 > 解决方案 > 与 Ray 并行并通过超时获取结果

问题描述

我制作的并行文档刮刀遇到了问题。

我正在使用 Ray 库和@ray.remote运行良好的装饰器。获取结果时会出现问题。这是我的代码片段:

ray.init(num_cpus=n_workers)
futures = [worker.remote(x) for x in path_and_dest]

# get results
for doc in futures:
    try:
        ray.get(doc, timeout = timeout)
        pbar1.update(1) # add 1 to success bar
    except RayTimeoutError:
        pbar2.update(1) # add 1 to failure bar
    except Exception as error:
        print("function raised %s" % error)
        print(error.traceback)

worker函数在给定路径的情况下抓取文档并将输出保存到目的地(作为变量中的参数给出path_and_dest)。timeout如果抓取文档的时间超过一组(以秒为单位),我尝试实现的功能将终止一个进程。

问题:

目前,如果由于 try-except 处理和我遍历futures-object 的方式而“失败”,则所有进程都会挂起。例如,如果我使用 8 个内核并且所有 8 个进程都超过了timeout它们应该同时失败的值,那么现在它们都需要8 * timeout几秒钟才能失败。

标签: pythonmultithreadingparallel-processingray

解决方案


ray.wait()api 与 while 循环一起使用。详细信息在此处的参考中:https ://ray.readthedocs.io/en/latest/package-ref.html#ray.wait

主要思想是运行一段时间循环并为wait期货引发超时错误。

伪代码

curr = 0
timeout_cnt = 10
while curr < timeout_cnt:
    ready, wait = ray.wait(futures)
    # Do some ready ID processing here.

    curr += 1
    time.sleep(0.1)
    if len(wait) == 0:
        break

推荐阅读