首页 > 解决方案 > Kafka Streams (2.6) 备用副本在滚动重启时的预期行为

问题描述

关于状态存储恢复上下文中 Kafka Streams (2.6) 的具体行为的问题。我目前有一个主题有 50 个分区,运行 5 个应用程序实例(每个有 30 个配置线程(备用副本有 20 个额外线程);每个实例都是一个 Spring Boot 应用程序实例)。我为每个实例配置了 2 个备用副本:

IE

spring.cloud.stream.kafka.streams.binder.configuration.num.standby.replicas=2

在使用完全饱和的状态存储启动负载测试后,该存储仅执行先前从 4 小时负载饱和的聚合(此时状态存储的大小约为 250 GB),以每秒 250 个事务处理 5 个实例,我反弹其中一个实例。

在这一点上,给定备用副本配置的预期是反弹实例的流任务只会迁移到另一个实例上的备用副本,并且当新实例出现时,只会再次迁移到新实例(但仅当它完全赶上)?

基于 KIP-441 ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams ) 和这个 ( https://www.confluent .io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/)我认为会是这样。

但目前看到所有实例似乎都受到影响的重新平衡,更糟糕的是,看起来有些实例需要几分钟时间。恢复。此外,当我在没有备用副本的情况下运行此测试时,我只看到循环实例受到影响,恢复时间为 55 秒。(看起来备用副本让事情变得更糟)。

试图确定我在哪里丢失了一些东西,因为我信任文档,但无法协调当前的行为。对于上下文,这是有问题的 kstream:

   @Bean
   public Function<KStream<String, String>, KStream<String, AggregatedMessage>> aggregate() {
       Serde<AggregatedMessage> serializer = new JsonSerde<>(AggregatedMessage.class);
       return ks ->
               ks.peek((k, v) -> {
                   if (lastProcessed.get() != -1) {
                       long inactivity = System.currentTimeMillis() - lastProcessed.get();
                       eventMetrics.recordInterEventLatency(inactivity);
                   }
                   lastProcessed.set(System.currentTimeMillis());
                   eventMetrics.increment();
               })
                       .mapValues(this::map)
                       .filter((k, v) -> v != null)
                       .groupByKey()
                       .aggregate(AggregatedMessage::new, this::process,
                               Materialized.<String, AggregatedMessage, KeyValueStore<Bytes, byte[]>>as(aggregatedStateStore)
                                       // disabling in-memory caching ensures all k-table state changes are flushed to the internal
                                       // topic (relevant when performing join operations between two k-tables to ensures the cache
                                       // doesn't perform compaction in which case a join would result in possibly only the latest)
                                       // if you only care about eventual consistency, then enable caching...
                                       .withCachingDisabled()
                                       .withKeySerde(Serdes.String())
                                       .withValueSerde(serializer)
                                       .withLoggingEnabled(topicConfig))
                       .toStream();
   }
spring.cloud.stream.bindings.aggregate-in-0.destination=${aggregation-in-stream-topic}
spring.cloud.stream.bindings.aggregate-in-0.content-type=application/json
spring.cloud.stream.bindings.aggregate-in-0.consumer.header-mode=none
spring.cloud.stream.bindings.aggregate-in-0.consumer.concurrency=30

spring.cloud.stream.kafka.streams.binder.configuration.replication.factor=3
spring.cloud.stream.kafka.streams.binder.configuration.num.standby.replicas=2

标签: apache-kafkaapache-kafka-streams

解决方案


推荐阅读