首页 > 解决方案 > Spring Cloud Stream 3.0 StreamsBuilderFactoryBeanCustomizer

问题描述

我无法使用 StreamsBuilderFactoryBeanCustomizer 为我的流使用者配置自定义处理程序。

@Bean
    public StreamsBuilderFactoryBeanCustomizer customizer() { 
        return fb -> {          
    fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomDeserializationHandler.class);
            fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomEventErrorHandler.class);               
            fb.getStreamsConfiguration().forEach((k,v) -> System.err.println("Key , Value "+ k + " , " + v));
        };
    }

在我的 spring boot @Configuration 类中设置上述内容并启动应用程序后,我仍然在 SteamConfig 下看到默认值。根据文档

块引用

定制器将在工厂 bean 启动之前由绑定器调用

块引用

但似乎 binder 没有调用 StreamsBuilderFactoryBeanCustomizer 这是一个已知问题吗? 我正在使用带有 2020.0.1 (spring-cloud.version) 2.4.2(spring boot) 的 spring-cloud-stream-binder-kafka-streams

021-03-11 11:52:04.386 [main] INFO  o.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
    acceptable.recovery.lag = 10000
    application.id = service-stream
    application.server = 
    bootstrap.servers = [localhost:9092]
    buffered.records.per.partition = 1000
    built.in.metrics.version = latest
    cache.max.bytes.buffering = 10485760
    client.id = 
    commit.interval.ms = 1000
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    max.task.idle.ms = 0
    max.warmup.replicas = 2

标签: spring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


看起来有两个同名的接口:一个来自Spring Kafka,另一个来自Spring Boot。binder 只考虑 Spring Kafka 中的那个。请确保您正在实施那个。我们在 Boot 中提交了一个问题来解决这种不一致问题。


推荐阅读