首页 > 解决方案 > Azure 流分析:云作业中的 ML 服务函数调用导致无输出事件

问题描述

我遇到了 Azure 流分析 (ASA) 作业的问题,该作业应调用 Azure ML 服务函数来对提供的输入数据进行评分。该查询是在 Visual Studio (VS) 2019 中使用“Azure 数据湖和流分析工具”扩展开发和测试的。该作业使用 Azure IoT-Hub 作为输入,并将 VS 本地输出作为输出用于测试目的(后来甚至使用 Blobstorage)。在此环境中一切正常,对 ML Service 函数的调用成功,并返回所需的响应。在云作业中使用相同的查询、用户定义的函数和聚合(如 VS 中的),不会生成任何输出事件(既没有 Blobstorage 也没有 Power BI 作为输出)。在 ML Webservice 中可以看到,ASA 成功调用了该函数,但不知何故没有返回任何响应数据。

对于 ML Web 服务的部署,我尝试了以下方法(适用于 VS,在云中没有输出):

推理脚本函数架构:

推理脚本输入架构如下所示:

@input_schema('data', NumpyParameterType(input_sample, enforce_shape=False))
@output_schema(NumpyParameterType(output_sample))  # other parameter type for record caused error in ASA
def run(data):
    response = {'score1': 0,
                'score2': 0,
                'score3': 0,
                'score4': 0,
                'score5': 0,
                'highest_score': None}

和返回值:

return [response]

带有 ML 函数调用的 ASA 作业子查询:

with raw_scores as (
select
    time, udf.HMMscore(udf.numpyfySeq(Sequence)) as score
from Sequence
)

和 UDF“numpyfySeq”,如:

// creates a N x 18 size array
function numpyfySeq(Sequence) {
    'use strict';
    var transpose = m => m[0].map((x, i) => m.map(x => x[i]));
    var array = [];
    for (var feature in Sequence) {
        if (feature != "time") {
            array.push(Sequence[feature])
        }
    }
    return transpose(array);
}

“序列”是一个子查询,它使用用户定义的聚合将数据聚合成序列(数组)。

在 VS 中,数据来自 IoT-Hub(选择了云输入)。如图所示,“函数签名”在门户中被正确识别:函数签名

我希望提供的信息是足够的,你可以帮助我。

编辑:

Azure ML Web 服务的身份验证是基于密钥的。在ASA中,选择使用“ Azure ML服务”功能时,它将自动检测并使用订阅和ML工作区中部署的ML模型中的键。使用的部署代码(在此示例中为 ACI,但在 AKS 部署中看起来几乎相同):

from azureml.core.model import InferenceConfig, Model
from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.webservice import AciWebservice


ws = Workspace.from_config()
env = Environment(name='scoring_env')
deps = CondaDependencies(conda_dependencies_file_path='./deps')
env.python.conda_dependencies = deps

inference_config = InferenceConfig(source_directory='./prediction/', 
                                   entry_script='score.py', 
                                   environment=env)
deployment_config = AciWebservice.deploy_configuration(auth_enabled=True, cpu_cores=1, 
                                                       memory_gb=1)
model = Model(ws, 'HMM')

service = Model.deploy(ws, 'hmm-scoring', models, 
                       inference_config, 
                       deployment_config, 
                       overwrite=True,)

service.wait_for_deployment(show_output=True)

与 conda_dependencies:

name: project_environment
dependencies:
  # The python interpreter version.

  # Currently Azure ML only supports 3.5.2 and later.

- python=3.7.5

- pip:
  - sklearn
  - azureml-core
  - azureml-defaults
  - inference-schema[numpy-support]
  - hmmlearn
- numpy
- pip
channels:
- anaconda
- conda-forge

score.py 中使用的代码只是一个常规的分数操作,加载的模型和格式如下:

score1 = model1.score(data)
score2 = model2.score(data)
score3 = model3.score(data)
# Same scoring with model4 and model5

# scaling of the scores to a defined interval and determination of model that delivered highest score

response['score1'] = score1
response['score2'] = score2
# and so on

标签: azureazure-stream-analyticsazure-machine-learning-service

解决方案


推荐阅读