spring - 春云流动活页夹
问题描述
我正在尝试实现一个能够自动缩放的 spring boot aws kinesis 消费者,以便与原始实例共享负载(拆分处理分片)。
我能够做的:使用定义明确的自述文件和此处提供的示例Kinesis binder docs我已经能够启动多个消费者,这些消费者实际上通过提供这些属性来划分分片以进行处理。
在生产者上,我通过应用程序属性提供 partitionCount: 2。在消费者身上,我提供了 instanceIndex 和 instanceCount。
在消费者 1 上我有 instanceIndex=0 和 instantCount =2,在消费者 2 上我有 instanceIndex=1 和 instantCount=2
这很好用,我有两个 Spring Boot 应用程序处理它们的特定分片。但在这种情况下,我必须为每个启动应用程序提供一个预配置的属性文件,该文件需要在加载时可用,以便它们拆分负载。如果我只启动第一个消费者(非自动缩放),我只处理特定于索引 0 的分片,而不处理其他分片。
我想做但不确定是否有可能部署一个消费者(处理所有分片)。如果我部署另一个实例,我希望该实例重温一些负载的第一个消费者,换句话说,如果我有 2 个分片和一个消费者,它将同时处理这两个,如果我随后部署另一个应用程序,我希望第一个消费者到现在只处理一个分片,将第二个分片留给第二个消费者。
我试图通过不在消费者上指定 instanceIndex 或 instanceCount 而只提供组名来做到这一点,但这会使第二个消费者闲置直到第一个消费者关闭。仅供参考,我还创建了自己的元数据和锁定表,防止活页夹创建默认值。
配置:生产者 -----------------
originator: KinesisProducer
server:
port: 8090
spring:
cloud:
stream:
bindings:
output:
destination: <stream-name>
content-type: application/json
producer:
headerMode: none
partitionKeyExpression: headers.type
消费者----------------------------------------
originator: KinesisSink
server:
port: 8091
spring:
cloud:
stream:
kinesis:
bindings:
input:
consumer:
listenerMode: batch
recordsLimit: 10
shardIteratorType: TRIM_HORIZON
binder:
checkpoint:
table: <checkpoint-table>
locks:
table: <locking-table
bindings:
input:
destination: <stream-name>
content-type: application/json
consumer:
concurrency: 1
listenerMode: batch
useNativeDecoding: true
recordsLimit: 10
idleBetweenPolls: 250
partitioned: true
group: mygroup
解决方案
这是正确的。这就是它现在的工作方式:如果有一个消费者在那里,它会占用所有分片进行处理。仅当第一个以某种方式损坏至少一个分片时,第二个才会采取行动。
适当的类似于 Kafka 的再平衡在我们的路线图上。我们还没有坚实的愿景,因此欢迎就此事提出问题和后续贡献!