首页 > 解决方案 > 从 boto3 table.batch_writer 对象获取 Http 响应

问题描述

csv中有一个数据列表,我想将其放入aws上的dynamodb表中。请参阅下面的示例列表。

    Mary,F,7065
    Anna,F,2604
    Emma,F,2003
    Elizabeth,F,1939
    Minnie,F,1746
    Margaret,F,1578
    Ida,F,1472
    Alice,F,1414
    Bertha,F,1320
    Sarah,F,1288
    Annie,F,1258
    Clara,F,1226
    Ella,F,1156
    Florence,F,1063
    Cora,F,1045
    Martha,F,1040
    Laura,F,1012
    Nellie,F,995
    Grace,F,982
    Carrie,F,949
    Maude,F,858
    Mabel,F,808
    Bessie,F,796
    Jennie,F,793
    Gertrude,F,787
    Julia,F,783
    Hattie,F,769
    Edith,F,768
    Mattie,F,704
    Rose,F,700
    Catherine,F,688
    Lillian,F,672
    Ada,F,652
    Lillie,F,647
    Helen,F,636
    Jessie,F,635
    Louise,F,635
    Ethel,F,633
    Lula,F,621
    Myrtle,F,615
    Eva,F,614
    Frances,F,605
    Lena,F,603
    Lucy,F,590
    Edna,F,588
    Maggie,F,582
    Pearl,F,569
    Daisy,F,564
    Fannie,F,560
    Josephine,F,544

为了将超过 25 个项目写入 dynamodb 表,文档使用 batch_writer 对象。

    resource = boto3.resource('dynamodb')
    table = resource.Table('Names')
    with table.batch_writer() as batch:
        for item in items:
            batch.put_item(item)

有没有办法返回一个 http 响应来表明 batch_write 的成功完成?我知道这是异步的。是否有等待或取回或其他呼叫?

标签: python-3.xamazon-web-servicesamazon-dynamodbboto3

解决方案


由 batch_writer 实例化的 BatchWriter 对象的文档位于(<3 Open Source)here。查看 BatchWriter 类,_flush 方法会生成一个响应,它只是没有将它存储在任何地方。

class BatchWriter(object):
    """Automatically handle batch writes to DynamoDB for a single table."""
    def __init__(self, table_name, client, flush_amount=25,
                 overwrite_by_pkeys=None):
        """
        :type table_name: str
        :param table_name: The name of the table.  The class handles
            batch writes to a single table.
        :type client: ``botocore.client.Client``
        :param client: A botocore client.  Note this client
            **must** have the dynamodb customizations applied
            to it for transforming AttributeValues into the
            wire protocol.  What this means in practice is that
            you need to use a client that comes from a DynamoDB
            resource if you're going to instantiate this class
            directly, i.e
            ``boto3.resource('dynamodb').Table('foo').meta.client``.
        :type flush_amount: int
        :param flush_amount: The number of items to keep in
            a local buffer before sending a batch_write_item
            request to DynamoDB.
        :type overwrite_by_pkeys: list(string)
        :param overwrite_by_pkeys: De-duplicate request items in buffer
            if match new request item on specified primary keys. i.e
            ``["partition_key1", "sort_key2", "sort_key3"]``
        """
        self._table_name = table_name
        self._client = client
        self._items_buffer = []
        self._flush_amount = flush_amount
        self._overwrite_by_pkeys = overwrite_by_pkeys

    def put_item(self, Item):
        self._add_request_and_process({'PutRequest': {'Item': Item}})

    def delete_item(self, Key):
        self._add_request_and_process({'DeleteRequest': {'Key': Key}})

    def _add_request_and_process(self, request):
        if self._overwrite_by_pkeys:
            self._remove_dup_pkeys_request_if_any(request)
        self._items_buffer.append(request)
        self._flush_if_needed()

    def _remove_dup_pkeys_request_if_any(self, request):
        pkey_values_new = self._extract_pkey_values(request)
        for item in self._items_buffer:
            if self._extract_pkey_values(item) == pkey_values_new:
                self._items_buffer.remove(item)
                logger.debug("With overwrite_by_pkeys enabled, skipping "
                             "request:%s", item)

    def _extract_pkey_values(self, request):
        if request.get('PutRequest'):
            return [request['PutRequest']['Item'][key]
                    for key in self._overwrite_by_pkeys]
        elif request.get('DeleteRequest'):
            return [request['DeleteRequest']['Key'][key]
                    for key in self._overwrite_by_pkeys]
        return None

    def _flush_if_needed(self):
        if len(self._items_buffer) >= self._flush_amount:
            self._flush()

    def _flush(self):
        items_to_send = self._items_buffer[:self._flush_amount]
        self._items_buffer = self._items_buffer[self._flush_amount:]
        response = self._client.batch_write_item(
            RequestItems={self._table_name: items_to_send})
        unprocessed_items = response['UnprocessedItems']

        if unprocessed_items and unprocessed_items[self._table_name]:
            # Any unprocessed_items are immediately added to the
            # next batch we send.
            self._items_buffer.extend(unprocessed_items[self._table_name])
        else:
            self._items_buffer = []
        logger.debug("Batch write sent %s, unprocessed: %s",
                     len(items_to_send), len(self._items_buffer))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, tb):
        # When we exit, we need to keep flushing whatever's left
        # until there's nothing left in our items buffer.
        while self._items_buffer:
            self._flush()

我是如何解决的:

我建立在对这个关于覆盖类方法的问题的回答之上。它们都可以工作,但最适合我的用例是用这个版本的 _flush 覆盖类实例。

首先,我构建了一个新版本的 _flush。

import logging
import types

## New Flush

def _flush(self):
    items_to_send = self._items_buffer[:self._flush_amount]
    self._items_buffer = self._items_buffer[self._flush_amount:]
    self._response = self._client.batch_write_item(
        RequestItems={self._table_name: items_to_send})
    unprocessed_items = self._response['UnprocessedItems']

    if unprocessed_items and unprocessed_items[self._table_name]:
        # Any unprocessed_items are immediately added to the
        # next batch we send.
        self._items_buffer.extend(unprocessed_items[self._table_name])
    else:
        self._items_buffer = []
    logger.debug("Batch write sent %s, unprocessed: %s",
                 len(items_to_send), len(self._items_buffer))


然后我像这样覆盖了实例方法。

with batch_writer() as batch:
    batch._flush=types.MethodType(_flush, batch)
    for item in items:
        batch.put_item(Item=item)
print(batch._response)

这会产生这样的输出。

{'UnprocessedItems': {},
 'ResponseMetadata': {'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'server': 'Server',
   'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
   'content-type': 'application/x-amz-json-1.0',
   'content-length': '23',
   'connection': 'keep-alive',
   'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
   'x-amz-crc32': '4185382645'},
  'RetryAttempts': 0}}

推荐阅读