python - 与 Ray 并行并通过超时获取结果
问题描述
我制作的并行文档刮刀遇到了问题。
我正在使用 Ray 库和@ray.remote
运行良好的装饰器。获取结果时会出现问题。这是我的代码片段:
ray.init(num_cpus=n_workers)
futures = [worker.remote(x) for x in path_and_dest]
# get results
for doc in futures:
try:
ray.get(doc, timeout = timeout)
pbar1.update(1) # add 1 to success bar
except RayTimeoutError:
pbar2.update(1) # add 1 to failure bar
except Exception as error:
print("function raised %s" % error)
print(error.traceback)
该worker
函数在给定路径的情况下抓取文档并将输出保存到目的地(作为变量中的参数给出path_and_dest
)。timeout
如果抓取文档的时间超过一组(以秒为单位),我尝试实现的功能将终止一个进程。
问题:
目前,如果由于 try-except 处理和我遍历futures
-object 的方式而“失败”,则所有进程都会挂起。例如,如果我使用 8 个内核并且所有 8 个进程都超过了timeout
它们应该同时失败的值,那么现在它们都需要8 * timeout
几秒钟才能失败。
解决方案
将ray.wait()
api 与 while 循环一起使用。详细信息在此处的参考中:https ://ray.readthedocs.io/en/latest/package-ref.html#ray.wait
主要思想是运行一段时间循环并为wait
期货引发超时错误。
伪代码
curr = 0
timeout_cnt = 10
while curr < timeout_cnt:
ready, wait = ray.wait(futures)
# Do some ready ID processing here.
curr += 1
time.sleep(0.1)
if len(wait) == 0:
break
推荐阅读
- android - Kotlin - 如何从回调中访问变量?
- javascript - Vue Material - 路由器视图上的本机转换
- reactjs - 基于 prop 在 React 组件中创建泛型类
- javascript - 你能用Javascript访问浏览器内存转储吗
- php - 在提交订单时显示付款弹出窗口
- c++ - 如何将 ICU 库添加到 ARM 的 Qt 项目?
- javascript - electron-vue ~ 无法使用 RxJS 订阅更新 vuex 状态
- python - 将函数参数设置为全局变量
- r - Rstudio:用循环替换复制和粘贴
- php - 有没有办法更新行以添加批次 ID 并返回该 ID?