amazon-web-services - 如何使用 moto 配置 AWS Firehose(Python AWS 模拟库)
问题描述
我正在创建这样的 Firehose 资源,以及一个名为 self.problem_reporter_bucket_name 的 s3 存储桶。但是,在调用 put_record 之后,我的存储桶中什么都没有。也就是说,当我在我的存储桶上调用 list_objects 时,没有项目。
self.firehose.create_delivery_stream(
DeliveryStreamName=self.problem_reporter_delivery_stream_name,
S3DestinationConfiguration={
'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
'BucketARN': 'arn:aws:s3:::' + self.problem_reporter_bucket_name,
'Prefix': 'myPrefix',
'BufferingHints': {
'SizeInMBs': 1,
'IntervalInSeconds': 60
},
'CompressionFormat': 'UNCOMPRESSED',
})
)
甚至 moto 都支持我的用例吗?
解决方案
moto 似乎不支持 Firehose。我做了这样的事情来测试我的 Firehose 相关代码;
如果在测试的模块中定义了 firehose 资源:
from unittest.mock import patch, MagicMock
@patch('mymodule.boto3')
def test_put_record(boto3):
record = {'ID": "123'}
my_put_firehose_record(record)
all_args = {
'DeliveryStreamName': 'my-test-firehose-stream',
'Record': record
}
boto3.client.assert_called_with('firehose')
boto3.client().put_record.assert_called_with(**all_args)
如果在被测试的模块之外定义了 firehouse 资源:
from unittest.mock import patch, MagicMock
def test_put_record():
firehose = MagicMock()
record = {'ID': '123'}
my_put_firehose_record_external(firehose, record)
all_args = {
'DeliveryStreamName': 'my-test-firehose-stream',
'Record': record
}
firehose.put_record.assert_called_with(**all_args)
推荐阅读
- r - 在 R 中使用可变时区将 UTC 时间转换为本地时间
- c++ - 无法将我的基类的双派生类转换为我的基类 C++
- javascript - SuiteScript 2.0 中的自定义模块
- wordpress - AWS eb deploy 不显示我对 css 文件所做的更改
- python-3.x - Flask SQL Alchemy 连接多个表
- python-3.x - 温度转换,无需调用方法
- html - 居中超大内容,但确保滚动仍然有效
- elasticsearch - Logstash 将 mongo 数据同步到 elasticsearch
- python - Pygame 时钟和事件循环
- php - 如何传递数据库数据并在yii2 highCharts中显示?