首页 > 解决方案 > 并行生产者和消费者使用项目反应器隐藏延迟

问题描述

我正在尝试使用 Spring 中的项目 Reactor 并行化生产者和消费者以隐藏延迟,以便生产者提前生成一个项目并将其放入队列中供消费者(比生产者慢 10 倍)处理但是我发现,如果我在顺序模式下使用缓冲区,生产者只是在使用并行调度程序或生成列表而不是单个项目时对所有内容进行排队。

Flux.generate(() -> 1,
        (state, sink) -> {

            if (state < 10) {
                log.info("[P] {}", state);
                sink.next(state++);
            }
            else {
                log.info("[P] Done!");
                sink.complete();
            }

            try {
                TimeUnit.MILLISECONDS.sleep(100);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            return state;
        })
.parallel(2)
.runOn(Schedulers.parallel())
.sequential()
.subscribe(item -> {
    log.info("[C] {}", item);
    try {
        TimeUnit.SECONDS.sleep(1);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
});

我想要实现的是这种模式,其中括号中的操作(P - 生产者,C - 消费者)并行发生。

时间步长 手术
T1 P1
T2 P2、C1
T3 P3、C2
T4 P4、C3
T5 P5、C4
T6 C5

这在 Reactor 项目中可行吗?

标签: spring-bootproject-reactor

解决方案


推荐阅读