python - 什么是 python ray 语法将导入的函数用作带有装饰器 arg 的射线远程函数,例如 num_cpus
问题描述
我正在尝试将导入的函数用作射线远程函数,但是声明远程函数的常用语法似乎不起作用。stackoverflow 上还有其他几个相关问题没有完全回答我的问题,请参阅:here和here。Link 1“似乎”解决了这个问题,但看起来很笨重,而且看起来不像我期望的 ray 开发人员打算使用它的方式。Ref 2 解决了一个类似的问题“except”,如果您需要为修饰函数提供参数(就像我在我的情况下所做的那样),那么它会返回以下错误:
AssertionError: The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied using some of the arguments 'num_returns', 'num_cpus', 'num_gpus', 'memory', 'object_store_memory', 'resources', 'max_calls', or 'max_restarts', like '@ray.remote(num_returns=2, resources={"CustomResource": 1})'.
这是完整工作代码的示例:
import pandas as pd
import numpy as np
import ray
from ray.cluster_utils import Cluster
config = {
"cluster_cpus":3,
"n_cores":1
}
df1 = pd.DataFrame()
df1['a'] = [1,2,3]
df1['b'] = [4,5,6]
try:
# init ray
# Start a head node for the cluster
if not ray.is_initialized():
master_cluster = Cluster(
initialize_head=True,
head_node_args={"num_cpus":config["cluster_cpus"]}
)
# start ray (after initializing cluster)
try:
ray.init(address=master_cluster.address, include_dashboard=False, log_to_driver=True)
except TypeError:
ray.init(address=master_cluster.address, include_webui=False)
df_id = df1
@ray.remote(num_cpus=config["n_cores"])
def sp_workflow(df, sp):
sp["output"] = df.sum(axis=1).values * sp["input"]
return sp
model_pool = [{"input":1},{"input":2},{"input":3}]
outputs = []
result_ids = [sp_workflow.remote(df=df_id, sp=sp) for sp in model_pool]
# Loop over the pending results and process completed jobs
while len(result_ids):
done_id, result_ids = ray.wait(result_ids)
sp = ray.get(done_id[0])
outputs.append(sp)
print(outputs)
except Exception as e:
raise e
finally:
if ray.is_initialized():
ray.shutdown()
但是,在我的情况下,'sp_workflow' 是存储在另一个脚本中的函数,因此不能将 @ray.remote(num_cpus=config["n_cores") 装饰器应用于它。如果我需要指定核心数量来提供我希望远程使用的导入功能,那么它在 ray 文档中并不清楚如何做到这一点?- 除非我错过了什么?
我尝试将 sp_workflow 定义替换为导入的版本,并将远程调用替换为以下行,但它给出了前面提到的错误:
from other_library import sp_workflow
result_ids = [ray.remote(sp_workflow(df=df_id, sp=sp), num_cpus=config["n_cores"] ) for sp in model_pool]
解决方案
按照您帖子中提到的 Ray Nishihara 的回答,我建议您不要错过将函数转换为远程版本的第一步,remote_foo = ray.remote(local_foo)
您可以在其中传递 num_cpus 参数。
所以在你的情况下应该是这样的:
from other_library import sp_workflow
sp_workflow_rm = ray.remote(sp_workflow, config["n_cores"])
result_ids = [ray.remote(sp_workflow_rm(df=df_id, sp=sp)) for sp in model_pool]
我没有测试它,希望它对你有用。
推荐阅读
- javascript - 如何在 redis 中找到部分匹配的值并更新它是否已经存在?
- d3.js - 如何控制堆积条形图中 x 轴标签位置的间距?
- maven - Maven Dependency 的依赖
- node.js - Docker 错误:没有这样的文件或目录,打开 '/package.json'
- c++ - 在函数与类/命名空间范围内行为不同的 C++ 宏值
- amazon-web-services - 为什么我无法访问 AWS IAM 列表,尽管我拥有所有 IAM 权限?
- android - Xamarin.Forms Android 对象引用未设置为对象的实例
- ios - IOS/Objective-C:子视图的子视图不显示
- javascript - 完整的背景图像鼠标移动覆盖范围
- java - 动态设置视图样式