首页 > 解决方案 > Kinesis 分片读取限制为 2 mib,那么获取记录调用怎么可能达到 10mib

问题描述

我指的是这个文档,它说“每个分片可以通过 GetRecords 支持高达每秒 2 MiB 的最大总数据读取速率。如果对 GetRecords 的调用返回 10 MiB,则在接下来的 5 秒内进行的后续调用会引发异常。” 我试图了解 getRecords 调用如何获得超过 2mib 的分片限制的(10Mib)?碎片在达到 2 mib 限制后不会停止/抛出错误吗?

提前致谢

标签: amazon-kinesis

解决方案


这句话看起来自相矛盾。他们应该改写它。

您应该考虑文档中的前两个语句以了解上下文。

摘自上述文件,

GetRecords 每次调用最多可以从单个分片中检索 10 MiB 的数据,每次调用最多可以检索 10,000 条记录。对 GetRecords 的每次调用都计为一次读取事务。

每个分片每秒最多可支持五个读取事务。每个读取事务最多可提供 10,000 条记录,每个事务的上限为 10 MiB。

每个分片可以通过 GetRecords 支持高达每秒 2 MiB 的最大总数据读取速率。如果对 GetRecords 的调用返回 10 MiB,则在接下来的 5 秒内进行的后续调用将引发异常。

根据我对 Kinesis 的经验,他们的实际意思是,每个分片对 GetRecords 调用具有每秒 2 MiB 的读取速率限制,并且该速率限制是在 GetRecords 调用开始时超过一秒的窗口计算的。

我不确定 Kinesis 的内部实现,但我知道Kafka 的内部。在 Kafka 中,分区(与 Kinesis 中的分片相同)进一步划分为段,这些段基本上是日志文件。因此,每条消息都作为一个条目存储在日志文件中。

我怀疑他们已经通过以下方式实现了 GetRecords 服务器端 API,

pythonish伪代码:

current_timestamp = datetime.now
seconds_diff = (LAST_SUCCESSFUL_CALL.timestamp - current_timestamp).total_seconds()
if LAST_SUCCESSFUL_CALL.data_size > (seconds_diff * 2 Mib):
  LAST_SUCCESSFUL_CALL.data_size = LAST_SUCCESSFUL_CALL.data_size - (seconds_diff * 2 Mib)
  throw Error
else
  records = data_store.find_next_records_from_segments(10 MiB)
  # Here, implementation does not limit the records because sequential disk reading is always faster. 
  # So, It will be better to get as much records it has with some upper cap of 10 MiB or till the end of segment. 
  LAST_SUCCESSFUL_CALL.data_size = records.data_size
  LAST_SUCCESSFUL_CALL.timestamp = current_timestamp
  return records

通过将速率限制检查分散到之前的调用中,他们正在简化实现。

它也最适合消费者可以快速赶上记录的流处理应用程序。

例如,假设发生以下事件

T1 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 1 MiB
T2 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 2 MiB
T3 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 3 MiB
T4 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 4 MiB
T5 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 5 MiB
T6 -> Ingest 1 MiB in shard, Consumer becomes idle and does GetRecords, gets 5 MiB data, Pending data = 1 MiB
T7 -> No new data ingestion, Consumer is busy on processing fetched data 
T8 -> No new data ingestion, Consumer is busy on processing fetched data 
T9 -> Consumer becomes Idle and does GetRecords, gets 1 MiB data. Pending data = 0 MiB 

因此,T7 到 T8,消费者使用 2 秒来完全处理 5 MiB 的数据,而不是分别为每个 2 MiB 的数据制作 GetRecords。在这里,我们正在保存网络调用和磁盘寻道。

综上所述,

碎片在达到 2 mib 限制后不会停止/抛出错误吗?

不,它不会。但是在随后几秒钟内进行的 GetRecords 会引发错误。但大多数情况下,您的消费者将花费随后的几秒钟处理您在第一次 GetRecords 调用时收到的 10 MiB 数据,而不是查询新数据。因此,您不必担心太多。


推荐阅读