首页 > 解决方案 > Dynamodb boto3 dynamodbstreams get_records

问题描述

我需要创建一个方法来允许我检索由我的 dynamodb 表中的流存储的记录。我以前在我的表基础中激活流,此外还执行了一些更新来生成这些流记录。

现在,为了更深入地了解如何实现这一点,我正在查看AWS 文档。因此,在boto3中,理论上可以检索这些记录的完整功能。

我一直在尝试部署该方法get_records,因为按照结构我可能会传递一个 ShardID。但后来我不知道如何生成该分片 ID。我尝试运行 a来获取分片并将其用作最终获得分片迭代器并触发describe_stream所需的 shardID ,但使用的 Shard ID 不是正确的。这是我的代码:get_shard_iteratorget_records

import boto3


client = boto3.resource('dynamodb')
clients = boto3.client('dynamodbstreams')
table = client.Table('songs')

  # How I suppose I can get a shard ID
 response = clients.describe_stream(
     StreamArn='arn:aws:dynamodb:region:account:table/songs/stream/2018-09-04T00:03:49.742',
     Limit=3, )
 print(response['StreamDescription']['Shards'])


# Now I pass the shard ID to get the shared iterator
response = clients.get_shard_iterator(
    ShardId='00000001536019433750-85f234d8',
    ShardIteratorType='TRIM_HORIZON',
    StreamArn='arn:aws:dynamodb:region:account:table/songs/stream/2018-09-04T00:03:49.742',
)
print(response)

错误:

botocore.errorfactory.ResourceNotFoundException:调用GetShardIterator操作时发生错误(ResourceNotFoundException):未找到请求的资源:分片不存在

非常感谢!

标签: amazon-web-servicesamazon-dynamodbboto3amazon-dynamodb-streams

解决方案


假设您的是单个分片流,请尝试以下步骤获取 ShardId :

import boto3


client = boto3.resource('dynamodb')
clients = boto3.client('dynamodbstreams')
table = client.Table('songs')

  # How I suppose I can get a shard ID
 response = clients.describe_stream(
     StreamArn='arn:aws:dynamodb:region:account:table/songs/stream/2018-09-04T00:03:49.742',
     Limit=3, )
 print(response['StreamDescription']['Shards'][0]['ShardId'])


# Now I pass the shard ID to get the shared iterator
response = clients.get_shard_iterator(
    ShardId=response['StreamDescription']['Shards'][0]['ShardId'],
    ShardIteratorType='TRIM_HORIZON',
    StreamArn='arn:aws:dynamodb:region:account:table/songs/stream/2018-09-04T00:03:49.742',
)

print(response)

推荐阅读