首页 > 解决方案 > 如何使用 moto 配置 AWS Firehose(Python AWS 模拟库)

问题描述

我正在创建这样的 Firehose 资源,以及一个名为 self.problem_reporter_bucket_name 的 s3 存储桶。但是,在调用 put_record 之后,我的存储桶中什么都没有。也就是说,当我在我的存储桶上调用 list_objects 时,没有项目。

self.firehose.create_delivery_stream(
  DeliveryStreamName=self.problem_reporter_delivery_stream_name,
  S3DestinationConfiguration={
    'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
    'BucketARN': 'arn:aws:s3:::' + self.problem_reporter_bucket_name,
    'Prefix': 'myPrefix',
    'BufferingHints': {
      'SizeInMBs': 1,
      'IntervalInSeconds': 60
    },
    'CompressionFormat': 'UNCOMPRESSED',
 })
)

甚至 moto 都支持我的用例吗?

标签: amazon-web-servicesmoto

解决方案


moto 似乎不支持 Firehose。我做了这样的事情来测试我的 Firehose 相关代码;

如果在测试的模块中定义了 firehose 资源:

from unittest.mock import patch, MagicMock
@patch('mymodule.boto3')
def test_put_record(boto3):
    record = {'ID": "123'}
    my_put_firehose_record(record)

    all_args = {
        'DeliveryStreamName': 'my-test-firehose-stream',
        'Record': record
    }

    boto3.client.assert_called_with('firehose')
    boto3.client().put_record.assert_called_with(**all_args)

如果在被测试的模块之外定义了 firehouse 资源:

from unittest.mock import patch, MagicMock
def test_put_record():
    firehose = MagicMock()
    record = {'ID': '123'}
    my_put_firehose_record_external(firehose, record)

    all_args = {
        'DeliveryStreamName': 'my-test-firehose-stream',
        'Record': record
    }

    firehose.put_record.assert_called_with(**all_args)

推荐阅读