首页 > 解决方案 > Wiered Spring kafka 消费者问题。具有相同配置的两个消费者具有不同的行为

问题描述

目前消费者正在按预期工作,我们不确定为什么,因为没有更改配置。一旦我们有更多信息,将更新问题

我们正在部署一个新的 Kafka 管道来处理数据。我们有两个消费者在听同一个话题。一位消费者正在按预期工作,而另一位则没有。
期望只有新消息应该由两个侦听器处理。侦听器 1 一次又一次地重新处理所有消息(未预期),而侦听器 2 仅处理新消息(预期)。我们不完全确定为什么会发生这种情况,因为两个 Listner 的配置相似,并且他们正在收听相同的主题。
Listner 1 的平均处理时间 = 4-5 秒。
Listner 2 的平均处理时间 = 3-4 秒。

卡夫卡消费者工厂。

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String,Object> consumerFactory(){
        Map<String,Object> configs=new HashMap<>();
        String bootstrap=System.getenv().getOrDefault("BOOTSTRAP_SERVERS_CONFIG","10.99.2.135:9093,10.99.2.136:9093,10.99.2.134:9093");
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
        configs.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
        configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000");
        configs.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "20000");
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        //configs.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "update");
        return new DefaultKafkaConsumerFactory<>(configs,new StringDeserializer(),new JsonDeserializer<>(Object.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(5);
        factory.getContainerProperties().setPollTimeout(10000);
        return factory;
    }
    
    @Bean
      public ProducerFactory<String, String> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        String bootstrap=System.getenv().getOrDefault("BOOTSTRAP_SERVERS_CONFIG","10.99.2.135:9093,10.99.2.136:9093,10.99.2.134:9093");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
      }

      @Bean
      public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerConfigs());
      }
}

侦听器 1,在一段时间后重新处理所有消息。

@KafkaListener(groupId="updateCache",topics="omp-orderBuffApi",containerFactory="kafkaListenerContainerFactory")
        public void consume(Object object){
          long time = System.currentTimeMillis();
         logger.info("Start Time of consumer for Details Cache:  {}" ,time);
         ConsumerRecord<String, Object> consumerRecord = (ConsumerRecord<String, Object>) object;
            LinkedHashMap<String, Object> map=(LinkedHashMap<String, Object>) consumerRecord.value();
            logger.info("Payload for Details Cache  : {}" ,map.get("payload"));
            Map<String,Object> payload=(Map<String, Object>) map.get("payload");
            String order = (String) payload.get("ORDERITEMNUMBER");       
            logger.info("OrderItemNumber  :  {}",order);
            long atime=System.currentTimeMillis();
            logger.info("Start Time of api - Details Cache:  {}" ,atime);
            solrController.updateOrderDetailsCache1(payload);
            float atimex = (System.currentTimeMillis() - atime)/1000F;
            logger.info("End Time of api -  Details Cache:  {}" ,atimex);
            float timex = (System.currentTimeMillis() - time)/1000F;
            logger.info("End Time of consumer for Details Cache:  {}" ,timex);
            logger.info("***************************************************");
            countDownLatch0.countDown();
        }

侦听器 2,按预期工作,不再重新处理任何消息。

@KafkaListener(groupId = "updateSolr", topics = "omp-orderBuffApi", containerFactory = "kafkaListenerContainerFactory")
    public void consumerForSolr(Object object) {
        long time = System.currentTimeMillis();
        logger.info("Start Time of consumer for Sorl Details Cache: {}", time);
        ConsumerRecord<String, Object> consumerRecord = (ConsumerRecord<String, Object>) object;
        LinkedHashMap<String, Object> map = (LinkedHashMap<String, Object>) consumerRecord.value();
        
        Map<String, Object> payload = (Map<String, Object>) map.get("payload");
         logger.info("payload for Solr Details : {} ", payload);
        String order = (String) payload.get("ORDERITEMNUMBER");
         logger.info("OrderItemNumber Sorl Details Cache: {} ", order);
        long atime = System.currentTimeMillis();
        logger.info("Start Time of api - Solr Details Cache: {}", atime);
        try {
            solrController.updateSolrDate(payload);
        } catch (SolrServerException | IOException | ParseException e) {
            e.printStackTrace();
        }

        float atimex = (System.currentTimeMillis() - atime) / 1000F;
        logger.info("End Time of api - Solr Details Cache: {}", atimex);
        float timex = (System.currentTimeMillis() - time) / 1000F;
        logger.info("end Time of consumer - Solr Details Cache: {}", timex);
        logger.info("***************************************************");
        countDownLatch3.countDown();
    }

标签: javaapache-kafkaspring-kafka

解决方案


推荐阅读