spring-cloud-stream - Spring Cloud Stream 3.0 StreamsBuilderFactoryBeanCustomizer
问题描述
我无法使用 StreamsBuilderFactoryBeanCustomizer 为我的流使用者配置自定义处理程序。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomDeserializationHandler.class);
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomEventErrorHandler.class);
fb.getStreamsConfiguration().forEach((k,v) -> System.err.println("Key , Value "+ k + " , " + v));
};
}
在我的 spring boot @Configuration 类中设置上述内容并启动应用程序后,我仍然在 SteamConfig 下看到默认值。根据文档
块引用
块引用
但似乎 binder 没有调用 StreamsBuilderFactoryBeanCustomizer 这是一个已知问题吗? 我正在使用带有 2020.0.1 (spring-cloud.version) 2.4.2(spring boot) 的 spring-cloud-stream-binder-kafka-streams
021-03-11 11:52:04.386 [main] INFO o.apache.kafka.streams.StreamsConfig - StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = service-stream
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 1000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
max.task.idle.ms = 0
max.warmup.replicas = 2
解决方案
看起来有两个同名的接口:一个来自Spring Kafka,另一个来自Spring Boot。binder 只考虑 Spring Kafka 中的那个。请确保您正在实施那个。我们在 Boot 中提交了一个问题来解决这种不一致问题。
推荐阅读
- sql - 如何按相邻值划分?
- javascript - 如何在反应中从我的状态中提取值?
- python - 熊猫值错误仍然显示,但代码完全正确,并且可以正常加载可视化
- javascript - 一个连接 - 多个通道
- nsvaluetransformer - 如何为存储在核心数据中的属性字符串实现 NSAttributedStringValueTransformer
- visual-studio-code - VS Code / IDE:如何为运行时声明的变量启用 cmd+click goto 定义?
- angular - Angular Fire Auth,自定义/扩展注册/用户创建,带有验证邮件和路由
- angular - 如何在 ngFor Loop 中动态设置 Kendo 组合框或 Kendo 下拉列表的值?
- php - 当我单击注册按钮时,laravel 7没有任何反应,似乎事件没有触发
- amazon-web-services - 如何使用 aws-cdk 获取托管在 VPC 中的 SFTP 服务器的子网 IP 地址?