首页 > 解决方案 > 在另一个完成后优雅地停止一个无限通量

问题描述

我有两个Flux例子:

Flux.<Integer> create(emitter -> { // I'm using integers for illustration purposes.
  for (int i = 50; i < 60; i++) {
    emitter.next(i);
  }
  // Note that there's no emitter.complete() here.
})

我想阻止直到任务完成,同时在后台处理任务的日志。任务完成后,我想取消日志流,这样就不会一直占用资源。如果它是有限的,我可以很容易地做到这一点:

public static void main(String[] args) {
    Flux<String> state = Flux.just("pending", "running", "running", "success")
                             .delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
                             .doOnNext(System.out::println); // Here the status would be sent to another microservice.

    Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
                                                                  // illustration purposes.
        for (int i = 50; i < 60; i++) {
            emitter.next(i);
        }
        // Note that there's no emitter.complete() here.
    })
                                   .delayElements(Duration.ofSeconds(4)) // Simulate the time between each log message.
                                   .bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
                                                                              // don't want to make a separate request for each log
                                                                              // line, so we batch them.
                                   .doOnNext(System.out::println);
    Flux.mergeDelayError(1, state.then(), logs.then())
        .blockLast();
}

不幸的是,它不是......所以,我解决这个问题的第一个想法是订阅日志通量,然后在状态通量完成后终止订阅:

public static void main(String[] args) {
    Flux<String> state = Flux.just("pending", "running", "running", "success")
                             .delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
                             .doOnNext(System.out::println); // Here the status would be sent to another microservice.

    Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
                                                                  // illustration purposes.
        for (int i = 50; i < 60; i++) {
            emitter.next(i);
        }
        // Note that there's no emitter.complete() here.
    })
                                   .delayElements(Duration.ofSeconds(4)) // Simulate the between each log message.
                                   .bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
                                                                              // don't want to make a separate request for each log
                                                                              // line, so we batch them.
                                   .doOnNext(System.out::println);
    Disposable logsSubscription = logs.subscribe();
    state.doFinally(signalType -> logsSubscription.dispose()).blockLast();
}

这很有效,但它与日志缓冲 ( .bufferTimeout(30, Duration.ofSeconds(15))) 配合得不好,有时会导致任务日志丢失。例如,两个日志进入缓冲区,但在达到缓冲区限制(或​​超过超时)之前,日志通量被取消。因此,这两个日志将永远不会得到处理。
我的第二个想法是用某种方式takeUntil(Predicate<? super T> predicate)决定何时停止从日志通量中获取元素。但是,这是有问题的,因为只有在通量中有新元素时才会调用谓词。这意味着如果在发出最后一个日志之后任务完成,则永远不会调用谓词。但是,takeUntil如果我将无限的日志流与另一个无限的虚拟对象流合并,我可以使用,以确保takeUntil定期触发:

private static boolean completed;

public static void main(String[] args) {
    Flux<String> state = Flux.just("pending", "running", "running", "success")
                             .delayElements(Duration.ofSeconds(2)) // Simulate the time it takes for a task to finish.
                             .doOnNext(System.out::println) // Here the status would be sent to another microservice.
                             .doFinally(signalType -> {
                                 completed = true;
                             });

    Flux<Integer> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
                                                            // illustration purposes.
        for (int i = 50; i < 60; i++) {
            emitter.next(i);
        }
        // Note that there's no emitter.complete() here.
    })
                             .delayElements(Duration.ofSeconds(4)); // Simulate the time between each log message.

    Flux<Integer> healthCheck = Flux.<Integer> generate(sink -> {
        sink.next(1);
    })
                                    .delayElements(Duration.ofSeconds(3));
    Flux<List<Integer>> logsWithHealthCheck = Flux.merge(logs, healthCheck)
                                                  .takeUntil(i -> completed)
                                                  .filter(i -> i != 1)
                                                  .bufferTimeout(30, Duration.ofSeconds(15))
                                                  .doOnNext(System.out::println)
                                                  .doFinally(System.out::println);
    Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logsWithHealthCheck);
    mergeDelayError.then()
                   .block();
}

这工作正常,但它似乎有点...... hacky。有没有更好的方法来完成我想要的?

标签: javaproject-reactor

解决方案


我能够通过使用takeUntilOther状态通量并将其转变为热源来解决我的问题:

    public static void main(String[] args) {
        Flux<String> state = Flux.just("pending", "running", "running", "success")
                                 .delayElements(Duration.ofSeconds(10)) // Simulate the time it takes for a task to finish.
                                 .doOnNext(System.out::println) // Here the status would be sent to another microservice.
                                 .replay() // Important
                                 .autoConnect();

        Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> {
            for (int i = 50; i < 60; i++) {
                emitter.next(i);
            }
            // Note that there's no emitter.complete() here.
        })
                                       .delayElements(Duration.ofSeconds(5)) // Simulate the time between each log message.
                                       .takeUntilOther(state.then())
                                       .bufferTimeout(30, Duration.ofSeconds(15))
                                       .doOnNext(System.out::println)
                                       .doFinally(System.out::println);
        Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logs);
        mergeDelayError.then()
                       .block();
    }

热源部分很重要,否则.takeUntilOther(state.then())mergeDelayError.then().block()行将在状态通量上创建两个订阅,这将复制它正在完成的工作。


推荐阅读