首页 > 解决方案 > 重新启动响应式消息传递,例如在重新配置之后

问题描述

如何重新启动或停止/恢复响应式消息传递,例如在更改间隔时间之后?此示例来自 Quarkus 指南:https ://quarkus.io/guides/kafka-streams

@Outgoing("temperature-values")                             
public Flowable<KafkaRecord<Integer, String>> generate() {

    return Flowable.interval(500, TimeUnit.MILLISECONDS)    
            .onBackpressureDrop()
            .map(tick -> {
                WeatherStation station = stations.get(random.nextInt(stations.size()));
                double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
            });
}

标签: quarkussmallrye-reactive-messaging

解决方案


您可以尝试替换FlowableSubject, 作为选项,并用于Flowable将值输入到Subject自身中。然后,当你想替换你需要的任何东西时,你将放弃当前Flowable并创建新的,这将提供Subject

class YourClass {

    private Subject<KafkaRecord<Integer, String>> temperatureSubject = BehaviorSubject.create();
    private Disposable currentSubscription;

    public void setFlowable() {
        if(currentSubscription != null && !currentSubscription.isDisposed()) {
            currentSubscription.dispose();
        }
        currentSubscription = Flowable.interval(5, TimeUnit.SECONDS)
                .map(it -> {
                    WeatherStation station = stations.get(random.nextInt(stations.size()));
                    double temperature = BigDecimal.valueOf(random.nextGaussian() * 15 + station.averageTemperature)
                        .setScale(1, RoundingMode.HALF_UP)
                        .doubleValue();

                    LOG.infov("station: {0}, temperature: {1}", station.name, temperature);
                    return KafkaRecord.of(station.id, Instant.now() + ";" + temperature);
                }).subscribe(it -> {
                    temperatureSubject.onNext(it);
                });
    }

    @Outgoing("temperature-values")
    public Flowable<KafkaRecord<Integer, String>> generate() {
        return temperatureSubject.toFlowable(BackpressureStrategy.LATEST);
    }
}

推荐阅读