首页 > 解决方案 > 什么是 python ray 语法将导入的函数用作带有装饰器 arg 的射线远程函数,例如 num_cpus

问题描述

我正在尝试将导入的函数用作射线远程函数,但是声明远程函数的常用语法似乎不起作用。stackoverflow 上还有其他几个相关问题没有完全回答我的问题,请参阅:herehere。Link 1“似乎”解决了这个问题,但看起来很笨重,而且看起来不像我期望的 ray 开发人员打算使用它的方式。Ref 2 解决了一个类似的问题“except”,如果您需要为修饰函数提供参数(就像我在我的情况下所做的那样),那么它会返回以下错误:

AssertionError: The @ray.remote decorator must be applied either with no arguments and no parentheses, for example '@ray.remote', or it must be applied using some of the arguments 'num_returns', 'num_cpus', 'num_gpus', 'memory', 'object_store_memory', 'resources', 'max_calls', or 'max_restarts', like '@ray.remote(num_returns=2, resources={"CustomResource": 1})'.

这是完整工作代码的示例:

import pandas as pd 
import numpy as np
import ray 
from ray.cluster_utils import Cluster 

config = {
    "cluster_cpus":3,
    "n_cores":1
}


df1 = pd.DataFrame()
df1['a'] = [1,2,3]
df1['b'] = [4,5,6]


try:

    # init ray 
    # Start a head node for the cluster
    if not ray.is_initialized():
        master_cluster = Cluster(
            initialize_head=True,
            head_node_args={"num_cpus":config["cluster_cpus"]}
            )

        # start ray (after initializing cluster)
        try:
            ray.init(address=master_cluster.address, include_dashboard=False, log_to_driver=True)
        except TypeError:
            ray.init(address=master_cluster.address, include_webui=False)

        df_id = df1

        @ray.remote(num_cpus=config["n_cores"])
        def sp_workflow(df, sp):

            sp["output"] = df.sum(axis=1).values * sp["input"]

            return sp        
  

        model_pool = [{"input":1},{"input":2},{"input":3}]
        outputs = []
        result_ids = [sp_workflow.remote(df=df_id, sp=sp) for sp in model_pool]
        
        # Loop over the pending results and process completed jobs
        while len(result_ids):
            done_id, result_ids = ray.wait(result_ids)
            sp = ray.get(done_id[0])   
            outputs.append(sp)       

        print(outputs)     

except Exception as e:
    raise e

finally:
    if ray.is_initialized():
        ray.shutdown() 

但是,在我的情况下,'sp_workflow' 是存储在另一个脚本中的函数,因此不能将 @ray.remote(num_cpus=config["n_cores") 装饰器应用于它。如果我需要指定核心数量来提供我希望远程使用的导入功能,那么它在 ray 文档中并不清楚如何做到这一点?- 除非我错过了什么?

我尝试将 sp_workflow 定义替换为导入的版本,并将远程调用替换为以下行,但它给出了前面提到的错误:

from other_library import sp_workflow
result_ids = [ray.remote(sp_workflow(df=df_id, sp=sp), num_cpus=config["n_cores"] ) for sp in model_pool]

标签: pythonmultithreadingmultiprocessingray

解决方案


按照您帖子中提到的 Ray Nishihara 的回答,我建议您不要错过将函数转换为远程版本的第一步,remote_foo = ray.remote(local_foo)您可以在其中传递 num_cpus 参数。

所以在你的情况下应该是这样的:

from other_library import sp_workflow
sp_workflow_rm = ray.remote(sp_workflow, config["n_cores"])
result_ids = [ray.remote(sp_workflow_rm(df=df_id, sp=sp)) for sp in model_pool]

我没有测试它,希望它对你有用。


推荐阅读