首页 > 解决方案 > 在 Spring Boot 中,如何配置一些侦听器使用自动确认而其他侦听器使用手动确认

问题描述

我有一个简单的 Spring Boot 应用程序,其中包含多个用 @KafkaListener 注释的方法。我希望他们中的一些人使用自动确认,而其他人使用手动确认。是否可以在不手动设置容器工厂的情况下使用 Spring Boot 配置来实现这一点?根据文档 使用 @KafkaListener 需要类似的代码

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

这是否意味着我需要创建两个容器工厂,一个用于自动确认,一个用于手动确认并配置所有侦听器以引用正确的工厂?

标签: javaspring-bootapache-kafkaspring-kafka

解决方案


这是否意味着我需要创建两个容器工厂,一个用于自动确认,一个用于手动确认并配置所有侦听器以引用正确的工厂?

假设是正确的。您只需为一个ListenerContainerFactorybean 提供一组,ContainerProperties而为另一个 bean 提供不同的组。有关更多信息,请参阅此 JavaDocs AbstractKafkaListenerContainerFactory

/**
 * Obtain the properties template for this factory - set properties as needed
 * and they will be copied to a final properties instance for the endpoint.
 * @return the properties.
 */
public ContainerProperties getContainerProperties()

containerFactory您可以考虑为ack 模式和一组侦听器依赖该属性的常规 bean 名称:

/**
 * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
 * to use to create the message listener container responsible to serve this endpoint.
 * <p>
 * If not specified, the default container factory is used, if any. If a SpEL
 * expression is provided ({@code #{...}}), the expression can either evaluate to a
 * container factory instance or a bean name.
 * @return the container factory bean name.
 */
String containerFactory() default "";

默认一个是KafkaListenerAnnotationBeanPostProcessor.DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME- kafkaListenerContainerFactory


推荐阅读