python-3.x - 如何模拟 elasticsearch.helpers.bulk
问题描述
我正在测试一个bulk
用于索引某些文档的类。
这是我的代码:
import mock
import unittest
import json
from elasticsearch.helpers import bulk
from ingestion.ingestor import Ingestor
class TestIngestor(unittest.TestCase):
def setUp(self):
self.ingestor = Ingestor()
@mock.patch("elasticsearch.helpers.bulk", mock.MagicMock(return_value=True))
def test_ingestor(self):
with open("tests/data/sample_payload.json", "r") as reader:
sample_messages = json.loads(reader.read())["Messages"]
actions = self.ingestor.ingest(sample_messages)
self.assertEqual(len(actions), 10)
但是,模拟似乎不起作用......当我运行它时,我得到一长串连接被拒绝的错误。
我如何解决它?
解决方案
事实证明我的补丁是错误的......这是我修复它的方法:
@mock.patch("elasticsearch.Elasticsearch.bulk",
mock.MagicMock(return_value={"items":[]}))
推荐阅读
- sharepoint - 如何将文件扩展名放入 Sharepoint 上的计算字段中
- udp - rtspsrc 到 udpsink 并将其消耗回来
- flutter - 无法运行“C:\Program Files\Go\bin\go.exe env”
- python - 在无限循环上运行的最后一个输出 - Python
- java - 如何在 Jersey Web 应用程序中获取文件资源?
- java - 使用 soot 生成调用图?
- swift - 如何在 SwiftUI 中进行自定义初始化以使用来自单独文件的参数
- user-interface - Windows 原生图形库
- firebase - Firebase 错误保存规则,第 1 行:输入不匹配
- sql-server - 创建查看查看结果的权限,无需访问其背后的表