首页 > 解决方案 > 如何从 spring webflux 将专有流转换为 Flux

问题描述

我有一个自定义事件总线,我可以在其中订阅 lambda

 bus.subscribe(topic, event -> {/*gets executed for every new event*/}, exception -> {})

现在 lambda 显然在不同的线程中运行。现在我的问题是如何将这种接口连接到一个Flux<Event>?我必须自己写Publisher吗?但人们说这样做不是一个好主意。

一个模拟实现将是

import java.util.function.Consumer

class Mock extends Thread {
    Consumer<String> lambda

    public Mock(Consumer<String> lambda) {
        this.lambda = lambda
    }

    @Override
    void run() {
        while(true) {
            Thread.sleep(1000)
            lambda.accept("lala")
        }
    }
}

Flux<String> flux = new Mock({ /*TODO write to flux*/ }).start()

标签: reactive-programmingspring-webfluxproject-reactor

解决方案


你是对的,你不应该实现你自己的发布者。在大多数情况下,您也不必处理线程,而是依赖于Flux.

就像是:

Flux<Event> events = Flux.<Event>create(emitter -> {

     bus.subscribe(topic, event -> emitter.next(event),
         exc -> emitter.error(exc));

     // you should also unsubscribe
     emitter.onDispose(() -> {
         bus.unsubscribe(topic, ...);
     });
 });

推荐阅读