首页 > 解决方案 > Kafka Streams以编程方式配置不起作用

问题描述

我正在做一个spring boot应用程序,我正在尝试以编程方式配置kafka,但由于某种原因仍然从application.yaml获取属性,而不是我以编程方式设置的属性

@Configuration
public class KafkaConfiguration {
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(30000);

        return factory;
    }

    public ConsumerFactory<String, String> kafkaConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "aaa"); // should crash since is not valid
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "app1");
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

@Component
public class StreamListener {
    @StreamListener(TestStreams.TEST_STREAM_IN)
    public void testStream(@Payload GenericCustomEvent response, @Headers MessageHeaders headers) throws Exception {
        log.debug("Received generic event {} with headers {}", response, headers);
    }
}

public interface TestStreams {
    String TEST_STREAM_IN = "test-stream-in";

    @Input(TEST_STREAM_IN)
    SubscribableChannel inputTestStream();
}

@EnableBinding({TestStreams.class})
@SpringBootApplication
public class KafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication .class, args);
    }
}

标签: spring-bootapache-kafkaspring-kafkaspring-cloud-stream-binder-kafka

解决方案


binder 不使用 a KafkaListenerContainerFactory,它从 yaml 创建容器本身。

ListenerContainerCustomizer您可以通过添加bean来修改容器。

此处示例使用 Spring Cloud Stream Kafka 3.0.3.RELEASE 时可以应用正常关闭吗?


推荐阅读