首页 > 解决方案 > KafkaBindingRebalanceListener Bean 未使用 Multi-binder 自动装配

问题描述

还有一篇关于类似问题的帖子。接受的答案适用于单个顶级活页夹。但是我没有多活页夹的运气。不确定我搞砸了什么,或者这种技术只支持一个顶级活页夹。

工作 YAML(只有一个顶级活页夹):

spring:
  cloud:
    function:
      definition: consume;
    stream:
      function:
        bindings:
          consume-in-0: input
      bindings:
        input:
          destination: students
          group: groupA
      kafka:
        binder:
          brokers: 192.168.86.23:9092

损坏的 YAML(多绑定器结构):

spring:
  cloud:
    function:
      definition: consume;
    stream:
      function:
        bindings:
          consume-in-0: input
      bindings:
        input:
          destination: students
          group: groupA
          binder: kafkaBinder
      binders:
        kafkaBinder:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: 192.168.86.23:9092

SpringBoot应用程序:

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    KafkaBindingRebalanceListener rebal() {
        return new KafkaBindingRebalanceListener() {

            @Override
            public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
                                             Collection<TopicPartition> partitions, boolean initial) {
                System.out.println(bindingName + " assignments: " + partitions + ", initial call :" + initial);
            }

        };
    }
}

另一篇文章:KafkaBindingRebalanceListener Bean not autowired by KafkaMessageChannelBinder Bean

标签: spring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


这是一个已知问题;有几个这样的问题;将您的声音添加到此声音中,以便获得更多关注。

今天早上我为另一个问题添加了一个临时解决方法

如果您不介意使用反射,也可以在这里使用该技术...

@Bean
KafkaBindingRebalanceListener rebalanceListener() {
    return new KafkaBindingRebalanceListener() {

        @Override
        public void onPartitionsAssigned(String bindingName,
                org.apache.kafka.clients.consumer.Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
                boolean initial) {

            if (initial) {
                System.out.println("SEEK");
                consumer.seekToBeginning(partitions);
            }
        }

    };
}

// BEGIN HACK TO REPLACE REBALANCE LISTENER IN BINDER

@Autowired(required = false)
private Collection<DefaultBinderFactory.Listener> binderFactoryListeners;

@Bean
public BinderFactory binderFactory(BinderTypeRegistry binderTypeRegistry,
        BindingServiceProperties bindingServiceProperties) {

    DefaultBinderFactory binderFactory = new DefaultBinderFactory(
            getBinderConfigurations(binderTypeRegistry, bindingServiceProperties),
            binderTypeRegistry) {

                @Override
                public synchronized <T> Binder<T, ?, ?> getBinder(String name,
                        Class<? extends T> bindingTargetType) {

                    Binder<T, ?, ?> binder = super.getBinder(name, bindingTargetType);
                    if (binder instanceof KafkaMessageChannelBinder) {
                        new DirectFieldAccessor(binder).setPropertyValue("rebalanceListener", rebalanceListener());
                    }
                    return binder;
                }


    };
    binderFactory.setDefaultBinder(bindingServiceProperties.getDefaultBinder());
    binderFactory.setListeners(this.binderFactoryListeners);
    return binderFactory;
}

// Had to copy this because it's private in BindingServiceConfiguration
private static Map<String, BinderConfiguration> getBinderConfigurations(
        BinderTypeRegistry binderTypeRegistry,
        BindingServiceProperties bindingServiceProperties) {

    Map<String, BinderConfiguration> binderConfigurations = new HashMap<>();
    Map<String, BinderProperties> declaredBinders = bindingServiceProperties
            .getBinders();
    boolean defaultCandidatesExist = false;
    Iterator<Map.Entry<String, BinderProperties>> binderPropertiesIterator = declaredBinders
            .entrySet().iterator();
    while (!defaultCandidatesExist && binderPropertiesIterator.hasNext()) {
        defaultCandidatesExist = binderPropertiesIterator.next().getValue()
                .isDefaultCandidate();
    }
    List<String> existingBinderConfigurations = new ArrayList<>();
    for (Map.Entry<String, BinderProperties> binderEntry : declaredBinders
            .entrySet()) {
        BinderProperties binderProperties = binderEntry.getValue();
        if (binderTypeRegistry.get(binderEntry.getKey()) != null) {
            binderConfigurations.put(binderEntry.getKey(),
                    new BinderConfiguration(binderEntry.getKey(),
                            binderProperties.getEnvironment(),
                            binderProperties.isInheritEnvironment(),
                            binderProperties.isDefaultCandidate()));
            existingBinderConfigurations.add(binderEntry.getKey());
        }
        else {
            Assert.hasText(binderProperties.getType(),
                    "No 'type' property present for custom binder "
                            + binderEntry.getKey());
            binderConfigurations.put(binderEntry.getKey(),
                    new BinderConfiguration(binderProperties.getType(),
                            binderProperties.getEnvironment(),
                            binderProperties.isInheritEnvironment(),
                            binderProperties.isDefaultCandidate()));
            existingBinderConfigurations.add(binderEntry.getKey());
        }
    }
    for (Map.Entry<String, BinderConfiguration> configurationEntry : binderConfigurations
            .entrySet()) {
        if (configurationEntry.getValue().isDefaultCandidate()) {
            defaultCandidatesExist = true;
        }
    }
    if (!defaultCandidatesExist) {
        for (Map.Entry<String, BinderType> binderEntry : binderTypeRegistry.getAll()
                .entrySet()) {
            if (!existingBinderConfigurations.contains(binderEntry.getKey())) {
                binderConfigurations.put(binderEntry.getKey(),
                        new BinderConfiguration(binderEntry.getKey(), new HashMap<>(),
                                true, "integration".equals(binderEntry.getKey()) ? false : true));
            }
        }
    }
    return binderConfigurations;
}

// END HACK


推荐阅读