python - 如何在 Azure Functions (python) 中模拟队列输出绑定?
问题描述
我有一个函数可以获取一些数据,对其进行处理,然后根据结果(如果工作失败或成功)向存储队列发送消息。
下面,一个简化的版本:
import json
import azure.functions as func
def main(job_data: dict, msg: func.Out[str]):
try:
# work with job_data
status = 'succeeded'
except:
status = 'failed'
msg.set(json.dumps({'status': status}))
该函数的绑定模式如下:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "job_data",
"type": "activityTrigger",
"direction": "in"
},
{
"type": "queue",
"direction": "out",
"name": "msg",
"queueName": "datalakejobs",
"connection": "AzureWebJobsStorage"
}
]
}
我正在使用 pytest 为这个函数编写一个测试。问题是我不知道如何模拟msg
主函数的参数。我尝试使用该类func.Out
,如下:
import azure.functions as func
import pytest
import activity_send_message
@pytest.fixture
def job_data():
return {'job_data': 'job'} # actually, it is just a representation of the data
def test_upload_data(job_data):
r = activity_send_message.main(job_data, func.Out)
assert True
此测试返回以下错误:
> msg.set(json.dumps({'status': status}))
E TypeError: set() missing 1 required positional argument: 'val'
我认为问题出在msg
模拟...如何模拟绑定msg
参数以使其工作?
解决方案
推荐阅读
- mysql - MySQL JOIN 使用多个列和表
- java - 冷融合 QR 图像 - JAVA 路径
- python - 如何使用 ColumnDataSource 更新 Span (Bokeh)?
- python - Discord Bot DM 问卷调查 discord py
- javascript - 如何在 React.js 中使用 react-router-dom 保持 App() 级别状态不重置?
- python - 如何在启动时最大化 tkinter 应用程序,就像模仿最大化按钮一样?
- python-3.x - Google Pub/Sub - 将消息发布到主题后,未从本地函数中找到事件数据
- python - Python 文字游戏工作
- python - 在不关心结果的情况下并行运行任务的最佳方法是什么?
- python - 使用python,pandas将相同的ID行从excel复制到另一个