spring-boot - 结合spring-kafka和reactor-kafka时出现意外的循环依赖
问题描述
以下代码在 receiverOptions 和模板之间产生了意外的循环依赖:
令人惊讶的是,如果 kafkaProps 从 spring 上下文中删除,它会起作用。
看起来一些自动配置正在从模板添加不必要的依赖项到接收器选项。
请建议配置 ReactiveKafkaConsumerTemplate 的正确方法。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public Map<String, Object> kafkaProps() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092",
ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName()
);
}
@Bean
public ReceiverOptions<String, String> receiverOptions(Map<String, Object> kafkaProps) {
return ReceiverOptions.<String, String>create(kafkaProps)
.withKeyDeserializer(new StringDeserializer())
.withValueDeserializer(new StringDeserializer())
.subscription(List.of("test-topic"));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> template(ReceiverOptions<String, String> receiverOptions) {
return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
}
@Bean
public ApplicationRunner runner(ReactiveKafkaConsumerTemplate<String, String> kafkaReceiver) {
return args -> kafkaReceiver.receive()
.log()
.subscribe();
}
}
完整的项目可以在https://github.com/piddubnyi/spring-reactor-kafka-ciclyc找到
完整的堆栈跟踪:
2020-01-07 16:29:51.031 DEBUG 24915 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter : Application failed to start due to an exception
org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name 'receiverOptions': Requested bean is currently in creation: Is there an unresolvable circular reference?
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.beforeSingletonCreation(DefaultSingletonBeanRegistry.java:339) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:215) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1287) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1207) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:885) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:789) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:539) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.addCandidateEntry(DefaultListableBeanFactory.java:1503) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.findAutowireCandidates(DefaultListableBeanFactory.java:1467) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveMultipleBeans(DefaultListableBeanFactory.java:1386) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1245) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1207) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:885) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:789) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:539) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:879) ~[spring-beans-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at com.example.demo.DemoApplication.main(DemoApplication.java:19) ~[main/:na]
解决方案
我想Spring不知道Map<String, Object>
要注入哪个。尝试添加@Qualifier("kafkaProps")
到kafkaProps
参数:
@Bean
public ReceiverOptions<String, String> receiverOptions(@Qualifier("kafkaProps") Map<String, Object> kafkaProps) {
// ...
}
更新
奇怪的是,在另一个地图中添加@Qualifier
Spring 包装后......kafkaProps
现在,也许只使用属性而不是地图:
@Bean
public Properties kafkaProps() {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, DemoApplication.class.getSimpleName());
return props;
}
@Bean
public ReceiverOptions<String, String> receiverOptions(@Qualifier("kafkaProps") Properties kafkaProps) {
// ...
}
更新 2
关于奇怪Map<String, Object>
的注射:其实并不奇怪。
Map<String, SomeType>
,当按类型注入时,以特殊方式处理:它收集该类型MyType
的所有 bean(键是 bean 名称)。因此,在您的情况下,Map<String, Object>
是上下文中所有 bean 的映射,显然以循环引用结束。对于 aList<Object>
或 a也会发生同样的事情Set<Object>
。添加 @Qualifier 注释不会改变这种行为,但会过滤 bean。因此,kafkaProps
地图是唯一通过过滤器的 bean,它似乎被“包裹”在另一个地图中。
只要预期的键类型是字符串,即使是类型化的 Map 实例也可以自动装配。映射值包含预期类型的所有 bean,键包含相应的 bean 名称 (...)
按名称切换到注入应该可以解决问题。通常它会通过添加@Resource(name = "kafkaProps")
而不是(隐式)@Autowired
注释来完成,但不幸的是,@Resource
注释不能放在方法参数上。
不过还有另一种解决方案。由于两个bean都在同一个@Configuration
类中声明,您可以kafkaProps()
直接调用。Spring 将返回相同的 bean,而不是每次调用的新实例(注意,它不适用于静态方法)
@Bean
public ReceiverOptions<String, String> receiverOptions() {
return ReceiverOptions.<String, String>create(kafkaProps())
...
}
参考:
推荐阅读
- macos - 在 MacOS10.14 Mojave 上,当我访问相机时,它会请求两次权限
- eviews - Eviews 中 Johansen 协整检验的最优滞后
- medium.com - 检查新的 medium.com 帖子的最快端点是什么?
- laravel - 如何在orWhere查询中查询多个whereMonth for laravel
- swift - facebook登录跳转到另一个页面
- c++builder - 全局声明 TBitmap
- android-recyclerview - 如何使用 Image 或 ImageSlider 滚动 recyclerview
- python - 如何定义涉及 for 循环的迭代函数,其中迭代器 i,j,l.. 也用于定义的迭代函数
- ios - 备份自制IOS应用
- php - 使用不带 $_POST 的 url 中的值检索数据库记录