python-3.x - 从 boto3 table.batch_writer 对象获取 Http 响应
问题描述
csv中有一个数据列表,我想将其放入aws上的dynamodb表中。请参阅下面的示例列表。
Mary,F,7065
Anna,F,2604
Emma,F,2003
Elizabeth,F,1939
Minnie,F,1746
Margaret,F,1578
Ida,F,1472
Alice,F,1414
Bertha,F,1320
Sarah,F,1288
Annie,F,1258
Clara,F,1226
Ella,F,1156
Florence,F,1063
Cora,F,1045
Martha,F,1040
Laura,F,1012
Nellie,F,995
Grace,F,982
Carrie,F,949
Maude,F,858
Mabel,F,808
Bessie,F,796
Jennie,F,793
Gertrude,F,787
Julia,F,783
Hattie,F,769
Edith,F,768
Mattie,F,704
Rose,F,700
Catherine,F,688
Lillian,F,672
Ada,F,652
Lillie,F,647
Helen,F,636
Jessie,F,635
Louise,F,635
Ethel,F,633
Lula,F,621
Myrtle,F,615
Eva,F,614
Frances,F,605
Lena,F,603
Lucy,F,590
Edna,F,588
Maggie,F,582
Pearl,F,569
Daisy,F,564
Fannie,F,560
Josephine,F,544
为了将超过 25 个项目写入 dynamodb 表,文档使用 batch_writer 对象。
resource = boto3.resource('dynamodb')
table = resource.Table('Names')
with table.batch_writer() as batch:
for item in items:
batch.put_item(item)
有没有办法返回一个 http 响应来表明 batch_write 的成功完成?我知道这是异步的。是否有等待或取回或其他呼叫?
解决方案
由 batch_writer 实例化的 BatchWriter 对象的文档位于(<3 Open Source)here。查看 BatchWriter 类,_flush 方法会生成一个响应,它只是没有将它存储在任何地方。
class BatchWriter(object):
"""Automatically handle batch writes to DynamoDB for a single table."""
def __init__(self, table_name, client, flush_amount=25,
overwrite_by_pkeys=None):
"""
:type table_name: str
:param table_name: The name of the table. The class handles
batch writes to a single table.
:type client: ``botocore.client.Client``
:param client: A botocore client. Note this client
**must** have the dynamodb customizations applied
to it for transforming AttributeValues into the
wire protocol. What this means in practice is that
you need to use a client that comes from a DynamoDB
resource if you're going to instantiate this class
directly, i.e
``boto3.resource('dynamodb').Table('foo').meta.client``.
:type flush_amount: int
:param flush_amount: The number of items to keep in
a local buffer before sending a batch_write_item
request to DynamoDB.
:type overwrite_by_pkeys: list(string)
:param overwrite_by_pkeys: De-duplicate request items in buffer
if match new request item on specified primary keys. i.e
``["partition_key1", "sort_key2", "sort_key3"]``
"""
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys
def put_item(self, Item):
self._add_request_and_process({'PutRequest': {'Item': Item}})
def delete_item(self, Key):
self._add_request_and_process({'DeleteRequest': {'Key': Key}})
def _add_request_and_process(self, request):
if self._overwrite_by_pkeys:
self._remove_dup_pkeys_request_if_any(request)
self._items_buffer.append(request)
self._flush_if_needed()
def _remove_dup_pkeys_request_if_any(self, request):
pkey_values_new = self._extract_pkey_values(request)
for item in self._items_buffer:
if self._extract_pkey_values(item) == pkey_values_new:
self._items_buffer.remove(item)
logger.debug("With overwrite_by_pkeys enabled, skipping "
"request:%s", item)
def _extract_pkey_values(self, request):
if request.get('PutRequest'):
return [request['PutRequest']['Item'][key]
for key in self._overwrite_by_pkeys]
elif request.get('DeleteRequest'):
return [request['DeleteRequest']['Key'][key]
for key in self._overwrite_by_pkeys]
return None
def _flush_if_needed(self):
if len(self._items_buffer) >= self._flush_amount:
self._flush()
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send})
unprocessed_items = response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
# When we exit, we need to keep flushing whatever's left
# until there's nothing left in our items buffer.
while self._items_buffer:
self._flush()
我是如何解决的:
我建立在对这个关于覆盖类方法的问题的回答之上。它们都可以工作,但最适合我的用例是用这个版本的 _flush 覆盖类实例。
首先,我构建了一个新版本的 _flush。
import logging
import types
## New Flush
def _flush(self):
items_to_send = self._items_buffer[:self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount:]
self._response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send})
unprocessed_items = self._response['UnprocessedItems']
if unprocessed_items and unprocessed_items[self._table_name]:
# Any unprocessed_items are immediately added to the
# next batch we send.
self._items_buffer.extend(unprocessed_items[self._table_name])
else:
self._items_buffer = []
logger.debug("Batch write sent %s, unprocessed: %s",
len(items_to_send), len(self._items_buffer))
然后我像这样覆盖了实例方法。
with batch_writer() as batch:
batch._flush=types.MethodType(_flush, batch)
for item in items:
batch.put_item(Item=item)
print(batch._response)
这会产生这样的输出。
{'UnprocessedItems': {},
'ResponseMetadata': {'RequestId': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'HTTPStatusCode': 200,
'HTTPHeaders': {'server': 'Server',
'date': 'Fri, 29 Mar 2019 18:29:49 GMT',
'content-type': 'application/x-amz-json-1.0',
'content-length': '23',
'connection': 'keep-alive',
'x-amzn-requestid': '853HSV0ULO4BN71R6T895J991VVV4KQNSO5AEMVJF66Q9ASUAAJ',
'x-amz-crc32': '4185382645'},
'RetryAttempts': 0}}
推荐阅读
- c# - 使用 C# 和 NPGSQL 将数据从 csv 文件导入到 postgresql 表
- postman - 定义 API 更改时邮递员更新集合
- azure-devops - 更改 Slack 工作区 URL 是否会影响 Slack 的 Azure DevOps 扩展?
- sql - 带有 2 个内部联接的 SQL Server 2014 STUFF
- php - php-fpm 无法设置 error_reporting
- r - 如何从选项卡式 ESPN 表格中提取球员统计数据?
- php - Laravel,第一个用户是唯一的用户
- java - Reactor 的 StepVerifier:断言在各个步骤上以不确定的方式失败
- build - 更新 AssemblyInfo 常量变量
- design-patterns - 使用什么模式?:处理程序链,其中下一个处理程序需要知道上一个处理程序的结果