首页 > 解决方案 > 在 Reactor 中使用 Flux.push() 丢失一些项目

问题描述

我的代码如下:

public class SequenceCreator {

    public Consumer<List<Integer>> consumer;

    public Flux<Integer> createNumberSequence() {
        return Flux.push(sink -> consumer = items -> items.forEach(sink::next));
    }

    public static void main(String[] args) throws InterruptedException {
        SequenceCreator sequenceCreator = new SequenceCreator();

        List<Integer> sequence1 = Lists.newArrayList(1,2,3,4,5);
        List<Integer> sequence2 = Lists.newArrayList(6,7,8,9,10);

        Thread producingThread1 = new Thread(
                () -> sequenceCreator.consumer.accept(sequence1));

        Thread producingThread2 = new Thread(
                () -> sequenceCreator.consumer.accept(sequence2));

        sequenceCreator.createNumberSequence().subscribe(System.out::println);

        producingThread1.start();
        producingThread2.start();

        while (true) {
            Thread.sleep(1000);
        }
    }
}

输出是

1 2 3 4 5 7 8 9 10

不知道为什么没有输出数字6?是多线程的原因吗?

标签: spring-webfluxproject-reactorreactor

解决方案


不知道为什么没有输出数字6?是多线程的原因吗?

是的,几乎可以肯定。看一下Javadoc Flux.push

通过 FluxSink API以编程方式创建具有从单线程生产者发出多个元素的能力的 Flux 。有关支持多线程的替代方案,请参阅create(Consumer)。

您没有使用单线程生产者(违反记录的要求),因此在这种情况下行为本质上是未定义的。您需要Flux.create按照文档的建议切换到,因为您使用多个线程进行发布。


推荐阅读