spring-cloud-stream - KafkaBindingRebalanceListener Bean 未使用 Multi-binder 自动装配
问题描述
还有一篇关于类似问题的帖子。接受的答案适用于单个顶级活页夹。但是我没有多活页夹的运气。不确定我搞砸了什么,或者这种技术只支持一个顶级活页夹。
工作 YAML(只有一个顶级活页夹):
spring:
cloud:
function:
definition: consume;
stream:
function:
bindings:
consume-in-0: input
bindings:
input:
destination: students
group: groupA
kafka:
binder:
brokers: 192.168.86.23:9092
损坏的 YAML(多绑定器结构):
spring:
cloud:
function:
definition: consume;
stream:
function:
bindings:
consume-in-0: input
bindings:
input:
destination: students
group: groupA
binder: kafkaBinder
binders:
kafkaBinder:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: 192.168.86.23:9092
SpringBoot应用程序:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
KafkaBindingRebalanceListener rebal() {
return new KafkaBindingRebalanceListener() {
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
}
};
}
}
另一篇文章:KafkaBindingRebalanceListener Bean not autowired by KafkaMessageChannelBinder Bean
解决方案
这是一个已知问题;有几个这样的问题;将您的声音添加到此声音中,以便获得更多关注。
今天早上我为另一个问题添加了一个临时解决方法。
如果您不介意使用反射,也可以在这里使用该技术...
@Bean
KafkaBindingRebalanceListener rebalanceListener() {
return new KafkaBindingRebalanceListener() {
@Override
public void onPartitionsAssigned(String bindingName,
org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
if (initial) {
System.out.println("SEEK");
consumer.seekToBeginning(partitions);
}
}
};
}
// BEGIN HACK TO REPLACE REBALANCE LISTENER IN BINDER
@Autowired(required = false)
private Collection<DefaultBinderFactory.Listener> binderFactoryListeners;
@Bean
public BinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry,
BindingServiceProperties bindingServiceProperties) {
DefaultBinderFactory binderFactory = new DefaultBinderFactory(
getBinderConfigurations(binderTypeRegistry, bindingServiceProperties),
binderTypeRegistry) {
@Override
public synchronized <T> Binder<T, ?, ?> getBinder(String name,
Class<? extends T> bindingTargetType) {
Binder<T, ?, ?> binder = super.getBinder(name, bindingTargetType);
if (binder instanceof KafkaMessageChannelBinder) {
new DirectFieldAccessor(binder).setPropertyValue("rebalanceListener", rebalanceListener());
}
return binder;
}
};
binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder());
binderFactory.setListeners(this.binderFactoryListeners);
return binderFactory;
}
// Had to copy this because it's private in BindingServiceConfiguration
private static Map<String, BinderConfiguration> getBinderConfigurations(
BinderTypeRegistry binderTypeRegistry,
BindingServiceProperties bindingServiceProperties) {
Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
Map<String, BinderProperties> declaredBinders = bindingServiceProperties
.getBinders();
boolean defaultCandidatesExist = false;
Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders
.entrySet().iterator();
while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
defaultCandidatesExist = binderPropertiesIterator.next().getValue()
.isDefaultCandidate();
}
List<String> existingBinderConfigurations = new ArrayList<>();
for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
.entrySet()) {
BinderProperties binderProperties = binderEntry.getValue();
if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderEntry.getKey(),
binderProperties.getEnvironment(),
binderProperties.isInheritEnvironment(),
binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
}
else {
Assert.hasText(binderProperties.getType(),
"No 'type' property present for custom binder "
+ binderEntry.getKey());
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderProperties.getType(),
binderProperties.getEnvironment(),
binderProperties.isInheritEnvironment(),
binderProperties.isDefaultCandidate()));
existingBinderConfigurations.add(binderEntry.getKey());
}
}
for (Map.Entry<String, BinderConfiguration> configurationEntry : binderConfigurations
.entrySet()) {
if (configurationEntry.getValue().isDefaultCandidate()) {
defaultCandidatesExist = true;
}
}
if (!defaultCandidatesExist) {
for (Map.Entry<String, BinderType> binderEntry : binderTypeRegistry.getAll()
.entrySet()) {
if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
binderConfigurations.put(binderEntry.getKey(),
new BinderConfiguration(binderEntry.getKey(), new HashMap<>(),
true, "integration".equals(binderEntry.getKey()) ? false : true));
}
}
}
return binderConfigurations;
}
// END HACK
推荐阅读
- c++ - 在使用预发布版本中使用 cmake 实现 Z3
- r - 即使在 {r, message=FALSE} 时,R Markdown Knitted PDF doc 也会继续显示消息
- vb.net - 文本框搜索重音字符问题?
- c# - LINQ Group 通过显示不正确的结果
- python - twitter 搜索 api - 只选择文本而不是用户
- reactjs - 在 GatsbyJS 中将自定义 React 组件导入 Markdown
- ssh - 当SSH在本地机器上终止时保持jupyter实验室笔记本运行?
- r - R中的$m是什么意思?
- javascript - 如何在 Angular 9 中显示文件的内容?
- laravel - PHP 单元测试 Laravel