java - 在另一个完成后优雅地停止一个无限通量
问题描述
我有两个Flux
例子:
- 包含正在执行的任务状态的字符串通量。例如:
Flux.just("pending", "running", "running", "successful");
这个通量是有限的,最终会在任务成功执行时结束。 - 另一个包含任务日志的字符串通量。当然,日志也是有限的,但由于与这个问题无关(并且超出我的控制)的原因,通量是无限的。它将产生有限数量的字符串,但绝不会产生
completed
信号:
Flux.<Integer> create(emitter -> { // I'm using integers for illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
我想阻止直到任务完成,同时在后台处理任务的日志。任务完成后,我想取消日志流,这样就不会一直占用资源。如果它是有限的,我可以很容易地做到这一点:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println); // Here the status would be sent to another microservice.
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)) // Simulate the time between each log message.
.bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
// don't want to make a separate request for each log
// line, so we batch them.
.doOnNext(System.out::println);
Flux.mergeDelayError(1, state.then(), logs.then())
.blockLast();
}
不幸的是,它不是......所以,我解决这个问题的第一个想法是订阅日志通量,然后在状态通量完成后终止订阅:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(5)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println); // Here the status would be sent to another microservice.
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)) // Simulate the between each log message.
.bufferTimeout(30, Duration.ofSeconds(15)) // The logs are again sent to another microservice, but we
// don't want to make a separate request for each log
// line, so we batch them.
.doOnNext(System.out::println);
Disposable logsSubscription = logs.subscribe();
state.doFinally(signalType -> logsSubscription.dispose()).blockLast();
}
这很有效,但它与日志缓冲 ( .bufferTimeout(30, Duration.ofSeconds(15))
) 配合得不好,有时会导致任务日志丢失。例如,两个日志进入缓冲区,但在达到缓冲区限制(或超过超时)之前,日志通量被取消。因此,这两个日志将永远不会得到处理。
我的第二个想法是用某种方式takeUntil(Predicate<? super T> predicate)
决定何时停止从日志通量中获取元素。但是,这是有问题的,因为只有在通量中有新元素时才会调用谓词。这意味着如果在发出最后一个日志之后任务完成,则永远不会调用谓词。但是,takeUntil
如果我将无限的日志流与另一个无限的虚拟对象流合并,我可以使用,以确保takeUntil
定期触发:
private static boolean completed;
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(2)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println) // Here the status would be sent to another microservice.
.doFinally(signalType -> {
completed = true;
});
Flux<Integer> logs = Flux.<Integer> create(emitter -> { // The logs are usually strings, but I'm using integers for
// illustration purposes.
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(4)); // Simulate the time between each log message.
Flux<Integer> healthCheck = Flux.<Integer> generate(sink -> {
sink.next(1);
})
.delayElements(Duration.ofSeconds(3));
Flux<List<Integer>> logsWithHealthCheck = Flux.merge(logs, healthCheck)
.takeUntil(i -> completed)
.filter(i -> i != 1)
.bufferTimeout(30, Duration.ofSeconds(15))
.doOnNext(System.out::println)
.doFinally(System.out::println);
Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logsWithHealthCheck);
mergeDelayError.then()
.block();
}
这工作正常,但它似乎有点...... hacky。有没有更好的方法来完成我想要的?
解决方案
我能够通过使用takeUntilOther
状态通量并将其转变为热源来解决我的问题:
public static void main(String[] args) {
Flux<String> state = Flux.just("pending", "running", "running", "success")
.delayElements(Duration.ofSeconds(10)) // Simulate the time it takes for a task to finish.
.doOnNext(System.out::println) // Here the status would be sent to another microservice.
.replay() // Important
.autoConnect();
Flux<List<Integer>> logs = Flux.<Integer> create(emitter -> {
for (int i = 50; i < 60; i++) {
emitter.next(i);
}
// Note that there's no emitter.complete() here.
})
.delayElements(Duration.ofSeconds(5)) // Simulate the time between each log message.
.takeUntilOther(state.then())
.bufferTimeout(30, Duration.ofSeconds(15))
.doOnNext(System.out::println)
.doFinally(System.out::println);
Flux<Object> mergeDelayError = Flux.mergeDelayError(1, state, logs);
mergeDelayError.then()
.block();
}
热源部分很重要,否则.takeUntilOther(state.then())
和mergeDelayError.then().block()
行将在状态通量上创建两个订阅,这将复制它正在完成的工作。
推荐阅读
- javascript - 访问替换函数中的第一个正则表达式匹配组
- python - 使用 OpenCV 填充边缘定义区域
- jboss - 删除某些 JBOSS 记录器的记录
- javascript - firebase:获取 ThenableReference 的密钥
- kubernetes - kafka.common.KafkaException:无法解析来自 Zookeeper 的代理信息
- class - 如何调用比较类中的对象
- javascript - QML WebEngineView 和 DOM 元素更新
- video - 如何提高来自 IPFS 的文件的下载速度?
- android - Flutter - 拍照
- go - 如何运行一个 GO 应用程序的多个实例?