首页 > 解决方案 > 如何从通量中取消订阅项目

问题描述

考虑以下通量

FluxSink<String> sink;
Flux<String> flux1 = Flux
    .<String>create(emitter -> { 
         sink = emitter;
    },...)
    .cache()
    .publish()
    .autoConnect();

所以要添加/订阅一个项目,我们可以做sink.next(“4”);

flux1.subscribe(item -> log.info(“item: “+item);

通过过滤flux1,比如从元素“2”开始,并没有从通量中删除该项目。我知道Flux发布者是不可变的。

如果我们可以通过接收器添加到它,我们如何从中删除一个项目flux1

标签: project-reactor

解决方案


正确的思考给出正确的答案

将 Flux 视为不可变的消息流。它就像一条河流,你可以往里面加点水,但你不能把已经流下来的水倒回去。但是,您可以将水过滤到下游。

如果您需要从流中“删除”非法元素,您可以过滤它们:

flux1.filter(e -> !e.equal("2"))
     .subscribe(item -> log.info(“item: “+item);

通量不是我们习惯的数据结构,而是一个数据流,你不能在数据提供点修改它,但可以操纵它们到达端点的方式


推荐阅读