首页 > 解决方案 > Spring Cloud Stream Kinesis Binder - 并发

问题描述

我用以下组件构建了一个 spring boot kinesis 消费者:

我使用1 shard 消耗来自 kinesis 流的事件。此外,这个 Spring Boot 消费者应用程序正在Pivotal Cloud Foundry Platform中运行。

在发布此问题之前,我在本地(使用 kinesalite)和 PCF(使用 kinesis 流)中尝试了该场景。您能否确认我的理解是否正确?我浏览了 spring 云流文档(https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/https://github.com/spring-cloud/spring-cloud-流-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc)。尽管文档很详尽,但并未详细解释并发性和高可用性。

假设我有 3 个消费者实例部署到 PCF(通过在 cf push 期间使用的manifest.yml文件中将实例属性设置为 3)。

所有 3 个实例都具有以下属性

spring.cloud.stream.bindings..consumer.concurrency=5

spring.cloud.stream.bindings..group=我的消费者组

spring.cloud.stream.kinesis.binder.checkpoint.table=my-metadata-dynamodb-table

spring.cloud.stream.kinesis.binder.locks.table=my-locks-dynamodb-table

假设生产者按此顺序将事件发送到 kinesis

event5(流中的最新事件) - event4 - event3 - event2 - event1(流中的第一个事件)

对于这样的配置,我在下面解释了我的理解。你能确认这是否正确吗?

  1. 在给定时间点,只有一个消费者实例处于活动状态,它将处理发送到 kinesis 流的所有事件(因为该流只有一个分片)。仅当主实例关闭时,其他 2 个实例之一才会获得控制权。此配置是为了确保高可用性并保留消息的顺序。
  2. 由于 PCF 的 manifest.yml 中设置了实例数,因此我无需担心设置 spring.cloud.stream.instanceCount 或 spring.cloud.stream.bindings..consumer.instanceCount 属性。
  3. 当 Spring Boot 消费者启动/启动时,有 5 个消费者线程处于活动状态(因为并发设置为 5)。现在事件按照上面解释的顺序被消费。Thread1 接收 event1。当 thread1 仍在积极处理 event1 时,另一个线程只是从流中挑选并开始处理下一个事件(thread2 处理 event2 等等......)。尽管在这种情况下保留了事件的顺序(事件 1 总是在事件 2 之前被拾取,依此类推......),但不能保证线程 1 会在线程 2 之前完成对事件 1 的处理。
  4. 当所有 5 个线程都忙于处理流中的 5 个事件时,如果有新事件说 event6 和 event7 进来,消费者必须等待一个线程可用。比如说,thread3 处理完 event3 并且其他线程仍在忙于处理事件,thread3 将拿起 event6 并开始处理,但由于没有可用的线程,event7 仍然没有被拿起。
  5. 默认情况下,并发设置为 1。如果您的业务需求要求您在处理下一个事件之前完成第一个事件的处理,那么并发应该为 1。在这种情况下,您会影响吞吐量。您一次只能消费一个事件。但是,如果吞吐量很重要,并且您希望在给定时间点处理多个事件,则应将并发性设置为所需的值。增加分片数量也是一种选择,但作为消费者,如果您不能要求增加,这是实现并行/吞吐量的最佳选择。

标签: javaspring-integrationspring-cloud-streamamazon-kinesiscloud-foundry

解决方案


请参阅concurrency以下选项中的 JavaDocs 选项KinesisMessageDrivenChannelAdapter

/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

因此,由于您在该流中只有一个分片,因此将只有一个活动线程ShardIterator在该单个分片上迭代 s。

关键是我们总是必须在单个线程中处理来自单个分片的记录。通过这种方式,我们保证了正确的顺序,并为最高的序列号完成了检查点。

请进一步调查什么是 AWS Kinesis 及其工作原理。


推荐阅读