首页 > 解决方案 > 合并热通量源

问题描述

在带有 Reactor 的 Spring Boot 2 中,我试图合并两个Flux热源。但是,merge似乎唯一报告Fluxmerge. 我如何merge才能识别第二个Flux

在下面的示例中,System.errinB-2甚至不打印 whenoutgoing1a是第一个参数。如果我制作outgoing2第一个,则A-2不会打印。

以下是完整示例;

package com.example.demo;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class Weather {
String city;
Integer temperature;

public Weather(String city, Integer temperature) {
    this.city = city;
    this.temperature = temperature;
}

@Override
public String toString() {
    return "Weather [city=" + city + ", temperature=" + temperature + "]";
}

public static void main(String[] args) {

    BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();
    BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>();

    // Assume Spring @Repository "A-1"
    new Thread(() -> {
        for (int d = 1; d < 1000; d += 1) {
            for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
                queue.add(new Weather(s, d));
                try { Thread.sleep(250); } catch (InterruptedException e) {}
            }
        }
    }).start(); 

    // Assume Spring @Repository "B-1"
    new Thread(() -> {
        for (int d = 1; d < 1000; d += 1) {
            for (String s: new String[] {"MOS", "TLV"}) {
                queue2.add(new Weather(s, d));
                try { Thread.sleep(1000); } catch (InterruptedException e) {}
            }
        }
    }).start();

    // Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR
    Flux<Weather> outgoing1 = Flux.<Weather>create(
        sink -> {
            for (int i = 0; i < 1000; i++) {
                try {
                    sink.next(queue.take());
                    System.err.println("1 " + queue.size());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            sink.complete();
        }
    ).publishOn(Schedulers.newSingle("outgoing-1"));

    // Assume Spring @Service "B-2" = real-time MOS, TLV
    Flux<Weather> outgoing2 = Flux.<Weather>create(
            sink -> {
                for (int i = 0; i < 1000; i++) {
                    try {
                        sink.next(queue2.take());
                        System.err.println("2 " + queue2.size());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                sink.complete();
            }
        ).publishOn(Schedulers.newSingle("outgoing-2"));

    // Assume Spring @Service "A-3" = 5 second summary of LDN, NYC, PAR, ZUR
    Flux<Weather> outgoing1a = Flux.from(outgoing1)   
        .groupBy(c -> c.city)
        .flatMap(g -> g
            .sample(Duration.ofSeconds(5))
        )
        .log("C");

    // Assume Spring @Service "C" - merges "A-3" and "B-2"
    // only prints outgoing1a
    Flux.merge(outgoing1a, outgoing2).subscribe(System.out::println); 

    // only prints outgoing2
    //Flux.merge(outgoing2, outgoing1a).subscribe(System.out::println); 

}
}

标签: javaspring-webfluxproject-reactor

解决方案


这里有几件事在起作用。

  1. 请注意运营商的以下建议.merge...

请注意,合并适用于异步源或有限源。在处理尚未在专用调度器上发布的无限源时,您必须将该源隔离在其自己的调度器中,否则合并会在订阅另一个源之前尝试将其耗尽。

  1. 您的出站 Flux 使用.publishOn,但这只会影响链接在操作员之后.publishOn操作员。即它不会影响之前的任何内容.publishOn。具体来说,它不会影响 lambda 中的代码传递给Flux.create执行的线程。如果您在每个出站通量之前添加,您可以看到这一点.log() .publishOn

  2. 您的 lambda 传递给Flux.create调用一个阻塞方法 ( queue.take)。

由于您subscribe(...)在线程中调用合并的 Flux,因此main您的 lambda 传递给Flux.create在线程中执行main,并阻止它。

最简单的解决方法是使用.subscribeOn而不是.publishOn让传递给的 lambda 中的代码在Flux.create不同的线程(除了main)上运行。这将防止main线程阻塞,并允许两个出站流的合并输出交错。


推荐阅读