首页 > 解决方案 > 如何在 Spring Cloud Stream Kafka 中创建动态流侦听器?

问题描述

我正在使用 Spring Cloud Stream Kafka。我有预定义主题的 StreamListeners。我的新要求是为用户定义的主题名称创建和停止 StreamListeners 运行时。因此,从用户界面用户将确定并更新哪些主题名称将被监听,并且主题监听器(StreamListeners)将被停止。有没有办法拥有灵活的 StreamListeners 运行时?我尝试使用 BinderAwareChannelResolver,但是在将不同的绑定器设置为不同的绑定时,我得到了 UnknownBinder 配置错误。我找不到详细的示例来涵盖我使用 BinderAwareChannelResolver 的要求。

@Autowired
private SubscribableChannelBindingTargetFactory bindingTargetFactory;

@Autowired
private BindingService bindingService;

@Autowired
BinderFactory binderFactory;

BindingServiceProperties properties = bindingService.getBindingServiceProperties();
properties.getConsumerProperties(channelName ).setBatchMode(true);

String binderConfigurationName = properties.getBinder(channelName);
Binder<SubscribableChannel, ConsumerProperties, ?> binder = (Binder<SubscribableChannel, ConsumerProperties, ?>) binderFactory.getBinder(binderConfigurationName, channel.getClass());
    copyExtendedConsumerProperties(binder, channelName);

bindingService.bindConsumer(channel, channelName);
channel.subscribe(new DynamicMessageHandler());

private void copyExtendedConsumerProperties(Binder binder, String channelName) {
    KafkaConsumerProperties extension = (KafkaConsumerProperties) ((ExtendedPropertiesBinder) binder).getExtendedConsumerProperties(channelName);
    extension.getConfiguration().put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, PersonDeserializer.class.getName());
    extension.getConfiguration().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "3000");
    extension.getConfiguration().put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "900000");
    extension.getConfiguration().put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000");
    extension.getConfiguration().put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

public class DynamicMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
    List<Person> personList = (List<Person>) message.getPayload();
    System.out.println(personList.size());
}
}

标签: spring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


Here is an example of two StreamListener methods in which both of them are stopped at application startup.

@Autowired
BindingsEndpoint endpoint;

@StreamListener("input-1")
public void listen1(String in) {
 System.out.println();
}

@StreamListener("input-2")
public void listen2(String in) {
 System.out.println();
}

@Bean
public ApplicationRunner runner() {
  return args -> {
     endpoint.changeState("input-1", State.STOPPED);
     endpoint.changeState("input-1", State.STOPPED);
}

When you run this application, it starts with both StreamListeners stopped. Now the user interface you mentioned need to expose the binding names. From the interface, the user will select, let's say, input-1 to start. Then the StreamListener named as listen1 will be started and the other one (listen2) will remain stopped. You can implement a REST endpoint to pass the binding name so that the binding can be started using the BindingsEndpoint.

BinderAwareChannelResolver is meant for dynamic destinations and used for outbound purposes. Not sure, how it can be helpful for your use case. In any case, BinderAwareChannelResolver, is deprecated in the latest versions of Spring Cloud Stream.

Spring Cloud Stream starting with 3.0.x versions, mainly prefers processors and consumers with a functional style. Although StreamListener is still available in 3.0.x, starting with 3.1.x, its usage is deprecated. We suggest you update your StreamListener methods to a functional style.

The two StreamListener methods above can be re-written as below:

@Bean
public Consumer<String> listen1() {
  return s -> {};
}

@Bean
public Consumer<String> listen2() {
  return s -> {};
}

By default, the binding names will be listen1-in-0 and listen2-in-0 which you can change. See the docs.


推荐阅读