首页 > 解决方案 > 如何在 Azure Functions (python) 中模拟队列输出绑定?

问题描述

我有一个函数可以获取一些数据,对其进行处理,然后根据结果(如果工作失败或成功)向存储队列发送消息。

下面,一个简化的版本:

import json

import azure.functions as func


def main(job_data: dict, msg: func.Out[str]):
    
    try:
        # work with job_data
        status = 'succeeded'
    except:
        status = 'failed'

    msg.set(json.dumps({'status': status}))

该函数的绑定模式如下:

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "job_data",
      "type": "activityTrigger",
      "direction": "in"
    },
    {
      "type": "queue",
      "direction": "out",
      "name": "msg",
      "queueName": "datalakejobs",
      "connection": "AzureWebJobsStorage"
    } 
  ]
}

我正在使用 pytest 为这个函数编写一个测试。问题是我不知道如何模拟msg主函数的参数。我尝试使用该类func.Out,如下:

import azure.functions as func
import pytest

import activity_send_message


@pytest.fixture
def job_data():
    return {'job_data': 'job'} # actually, it is just a representation of the data

def test_upload_data(job_data):
    r = activity_send_message.main(job_data, func.Out)
    assert True

此测试返回以下错误:

>       msg.set(json.dumps({'status': status}))
E       TypeError: set() missing 1 required positional argument: 'val'

我认为问题出在msg模拟...如何模拟绑定msg参数以使其工作?

标签: pythonazure-functions

解决方案


推荐阅读