首页 > 解决方案 > 使用 @StreamListener 时,对 KafkaListenerContainerFactory 的自定义会反映在生成的 KafkaMessageListenerContainer 中吗?

问题描述

我正在使用带有 kafka binder 的 spring-cloud-stream 来使用来自 kafka 的消息。该应用程序基本上是使用来自 kafka 的消息并更新数据库。

在某些情况下,DB 出现故障(可能会持续数小时)或其他一些临时技术问题。由于在这些情况下,在有限的时间内重试消息然后将其移动到 DLQ 是没有意义的,所以当我们遇到某些类型的异常(例如 DBHostNotAvaialableException)时,我试图实现无限次数的重试

为了实现这一点,我尝试了两种方法(两种方法都面临问题) -

  1. 在这种方法中,尝试在配置 ConcurrentKafkaListenerContainerFactory bean 时在容器属性上设置错误处理程序,但错误处理程序根本没有被触发。在调试流程时,我在创建的 KafkaMessageListenerContainer 中实现了 errorHandler 字段,因此它们使用默认的 LoggingErrorHandler。下面是我的容器工厂 bean 配置 - 这种方法的 @StreamListener 方法与第二种方法相同,除了对消费者的搜索。

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> 
     kafkaListenerContainerFactory(ConsumerFactory<String, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(kafkaConsumerFactory);
        factory.getContainerProperties().setAckOnError(false);
        ContainerProperties containerProperties = factory.getContainerProperties();
         // even tried a custom implementation of RemainingRecordsErrorHandler but call never went in to the implementation
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
        return factory;
    }
    

我在配置工厂 bean 时是否遗漏了一些东西,或者这个 bean 仅与 @KafkaListener 而不是 @StreamListener 相关?

  1. 第二种选择是尝试使用手动确认和查找来实现它,在 @StreamListener 方法中从标头获取确认和使用者,如果收到可重试的异常,我会使用 retrytemplate 进行一定次数的重试,当这些重试用尽时我会触发consumer.seek(). 下面的示例代码 -

    @StreamListener(MySink.INPUT)
    public void processInput(Message<String> msg) {
    
    MessageHeaders msgHeaders = msg.getHeaders();
    Acknowledgment ack = msgHeaders.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
    Consumer<?,?> consumer = msgHeaders.get(KafkaHeaders.CONSUMER, Consumer.class);
    Integer partition = msgHeaders.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
    String topicName = msgHeaders.get(KafkaHeaders.RECEIVED_TOPIC, String.class);
    Long offset = msgHeaders.get(KafkaHeaders.OFFSET, Long.class);
    
    
    try {
      retryTemplate.execute(
                context -> {
                 // this is a sample service call to update database which might throw retryable exceptions like DBHostNotAvaialableException
                    consumeMessage(msg.getPayload());
                    return null;
                }
        );
    }
    catch (DBHostNotAvaialableException ex) {
      // once retries as per retrytemplate are  exhausted do a seek
    
        consumer.seek(new TopicPartition(topicName, partition), offset);
    
    }
    catch (Exception ex) {
      // if some other exception just log and put in dlq based on enableDlq property
        logger.warn("some other business exception hence putting in dlq ");
        throw ex;
    }
    
    if (ack != null) {
        ack.acknowledge();
    }
    

    }

这种方法的问题- 因为我正在执行 consumer.seek() 而可能有来自上次轮询的待处理记录,如果 DB 在此期间出现(因此出现故障),这些记录可能会被处理和提交。有没有办法在执行搜索时清除这些记录?

PS - 我们目前处于 2.0.3.RELEASE 版本的 spring boot 和 Finchley.RELEASE 或 spring 云依赖项(因此也不能使用否定确认等功能,目前无法升级)。

标签: spring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


Spring Cloud Stream 不使用容器工厂。我已经在这个答案中向你解释了这一点。

2.1 版引入了ListenerContainerCustomizer,如果您添加该类型的 bean,它将在容器创建后被调用。

Spring Boot 2.0 在一年多前就已终止,不再受支持。

我提到的答案显示了如何使用反射来添加错误处理程序。

只有在你有max.poll.records=1.


推荐阅读