spring-cloud-stream - 如何在 Spring Cloud Stream Kafka 中创建动态流侦听器?
问题描述
我正在使用 Spring Cloud Stream Kafka。我有预定义主题的 StreamListeners。我的新要求是为用户定义的主题名称创建和停止 StreamListeners 运行时。因此,从用户界面用户将确定并更新哪些主题名称将被监听,并且主题监听器(StreamListeners)将被停止。有没有办法拥有灵活的 StreamListeners 运行时?我尝试使用 BinderAwareChannelResolver,但是在将不同的绑定器设置为不同的绑定时,我得到了 UnknownBinder 配置错误。我找不到详细的示例来涵盖我使用 BinderAwareChannelResolver 的要求。
@Autowired
private SubscribableChannelBindingTargetFactory bindingTargetFactory;
@Autowired
private BindingService bindingService;
@Autowired
BinderFactory binderFactory;
BindingServiceProperties properties = bindingService.getBindingServiceProperties();
properties.getConsumerProperties(channelName ).setBatchMode(true);
String binderConfigurationName = properties.getBinder(channelName);
Binder<SubscribableChannel, ConsumerProperties, ?> binder = (Binder<SubscribableChannel, ConsumerProperties, ?>) binderFactory.getBinder(binderConfigurationName, channel.getClass());
copyExtendedConsumerProperties(binder, channelName);
bindingService.bindConsumer(channel, channelName);
channel.subscribe(new DynamicMessageHandler());
private void copyExtendedConsumerProperties(Binder binder, String channelName) {
KafkaConsumerProperties extension = (KafkaConsumerProperties) ((ExtendedPropertiesBinder) binder).getExtendedConsumerProperties(channelName);
extension.getConfiguration().put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, PersonDeserializer.class.getName());
extension.getConfiguration().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "3000");
extension.getConfiguration().put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "900000");
extension.getConfiguration().put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "5000");
extension.getConfiguration().put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}
public class DynamicMessageHandler implements MessageHandler {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
List<Person> personList = (List<Person>) message.getPayload();
System.out.println(personList.size());
}
}
解决方案
Here is an example of two StreamListener
methods in which both of them are stopped at application startup.
@Autowired
BindingsEndpoint endpoint;
@StreamListener("input-1")
public void listen1(String in) {
System.out.println();
}
@StreamListener("input-2")
public void listen2(String in) {
System.out.println();
}
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("input-1", State.STOPPED);
endpoint.changeState("input-1", State.STOPPED);
}
When you run this application, it starts with both StreamListener
s stopped. Now the user interface you mentioned need to expose the binding names. From the interface, the user will select, let's say, input-1
to start. Then the StreamListener
named as listen1
will be started and the other one (listen2
) will remain stopped. You can implement a REST endpoint to pass the binding name so that the binding can be started using the BindingsEndpoint
.
BinderAwareChannelResolver
is meant for dynamic destinations and used for outbound purposes. Not sure, how it can be helpful for your use case. In any case, BinderAwareChannelResolver
, is deprecated in the latest versions of Spring Cloud Stream.
Spring Cloud Stream starting with 3.0.x versions, mainly prefers processors and consumers with a functional style. Although StreamListener
is still available in 3.0.x, starting with 3.1.x, its usage is deprecated. We suggest you update your StreamListener
methods to a functional style.
The two StreamListener
methods above can be re-written as below:
@Bean
public Consumer<String> listen1() {
return s -> {};
}
@Bean
public Consumer<String> listen2() {
return s -> {};
}
By default, the binding names will be listen1-in-0
and listen2-in-0
which you can change. See the docs.
推荐阅读
- c# - SonarScanner 未在 SonarQube 中发布结果
- javascript - 如何通过加 2 修复未返回长度字符串值的“截断字符串”
- c# - UWP 键盘加速器 FN 问题
- python - 网络抓取 python/pandas 时忽略丢失的表:mlb 数据
- python-3.x - 如何使用 python selenium 从页面中递归地刮取表格
- linux-kernel - ACPI 定义块:系统总线的 _SB 和 _SB_ 之间的区别
- html - 转换 translateY 在 Safari 上不起作用
- c++ - 使用 cmake 静态链接到 Linux 中的 C++ AWS 开发工具包会因链接到共享对象文件而失败
- django - 在 Django 模型字段中存储层次结构/树?
- javascript - 如何在使用 JavaScript 或 jQuery 选择父选项时进行选择