spring - 具有并行性的 Reactor GroupBy 在同一线程上运行
问题描述
我正在尝试实现每个组的并行性,其中分组元素并行运行,并且在组内每个元素按顺序工作。然而对于下面的代码,第一个发射使用并行线程,但对于后续发射它使用一些不同的线程池。如何实现组内元素的组并行和顺序执行。
public class ReactorTest implements SmartLifecycle, ApplicationListener<ApplicationReadyEvent> {
private AtomicInteger counter = new AtomicInteger(1);
private Many<Integer> healthSink;
private Disposable dispose;
private ScheduledExecutorService executor;
@Override
public void start() {
executor = Executors.newSingleThreadScheduledExecutor();
healthSink = Sinks.many().unicast().onBackpressureBuffer();
dispose = healthSink.asFlux().groupBy(v -> v % 3 == 0).parallel(10)
.runOn(Schedulers.newBoundedElastic(10, 100, "k-task")).log().flatMap(v -> v)
.subscribe(v -> log.info("Data {}", v));
}
@Override
public void stop() {
executor.shutdownNow();
if (dispose != null) {
dispose.dispose();
}
}
@Override
public boolean isRunning() {
return executor == null ? false : !executor.isShutdown();
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
executor.scheduleAtFixedRate(() -> {
healthSink.tryEmitNext(counter.incrementAndGet());
healthSink.tryEmitNext(counter.incrementAndGet());
healthSink.tryEmitNext(counter.incrementAndGet());
}, 10, 10, TimeUnit.SECONDS);
}
}
日志
2021-07-27 14:15:34.189 INFO 22212 --- [ restartedMain] i.g.kprasad99.reactor.DemoApplication : Started DemoApplication in 1.464 seconds (JVM running for 1.795)
2021-07-27 14:15:44.206 INFO 22212 --- [ k-task-1] reactor.Parallel.RunOn.1 : onNext(UnicastGroupedFlux)
2021-07-27 14:15:44.207 INFO 22212 --- [ k-task-2] reactor.Parallel.RunOn.1 : onNext(UnicastGroupedFlux)
2021-07-27 14:15:44.207 INFO 22212 --- [ k-task-1] io.github.kprasad99.reactor.ReactorTest : Data 2
2021-07-27 14:15:44.207 INFO 22212 --- [ k-task-2] io.github.kprasad99.reactor.ReactorTest : Data 3
2021-07-27 14:15:44.207 INFO 22212 --- [ k-task-1] io.github.kprasad99.reactor.ReactorTest : Data 4
2021-07-27 14:15:54.200 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 5
2021-07-27 14:15:54.200 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 6
2021-07-27 14:15:54.200 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 7
2021-07-27 14:16:04.195 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 8
2021-07-27 14:16:04.195 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 9
2021-07-27 14:16:04.195 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 10
2021-07-27 14:16:14.206 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 11
2021-07-27 14:16:14.206 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 12
2021-07-27 14:16:14.206 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 13
2021-07-27 14:16:24.197 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 14
2021-07-27 14:16:24.197 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 15
2021-07-27 14:16:24.197 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 16
2021-07-27 14:16:34.196 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 17
2021-07-27 14:16:34.196 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 18
2021-07-27 14:16:34.196 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 19
2021-07-27 14:16:44.201 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 20
2021-07-27 14:16:44.201 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 21
2021-07-27 14:16:44.201 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 22
2021-07-27 14:16:54.201 INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest : Data 23
解决方案
您需要在运算符.parallel(..)
之后.flatMap(..)
放置:
Flux.interval(Duration.ofMillis(100))
.take(10)
.groupBy(v -> v % 2 == 0)
.flatMap(f -> f)
.parallel(2)
.runOn(Schedulers.newBoundedElastic(2, 10, "k-task"))
.subscribe(i -> log.info("Data {}", i));
结果:
10:32:33.377 [k-task-1] INFO Data 0
10:32:33.466 [k-task-2] INFO Data 1
10:32:33.562 [k-task-1] INFO Data 2
10:32:33.673 [k-task-2] INFO Data 3
10:32:33.766 [k-task-1] INFO Data 4
10:32:33.860 [k-task-2] INFO Data 5
10:32:33.971 [k-task-1] INFO Data 6
10:32:34.065 [k-task-2] INFO Data 7
10:32:34.163 [k-task-1] INFO Data 8
10:32:34.268 [k-task-2] INFO Data 9
推荐阅读
- kubernetes - MicroK8S 部署网络服务器
- python - 重新计算聚类质心
- flutter - 错误:为 ios 构建颤振应用程序时“找不到 -lStripe 的库”
- android - Android房间数据库创建没有ID的实体对象
- node.js - 使用网络语音 API 的网络应用程序对话流中的上下文不起作用
- python - 使用 Python 以大写形式保存文本文件
- c# - 工厂的依赖注入生命周期
- c++ - C++/QT继承程序架构
- c# - 为什么我会丢失信息?
- python-3.x - 我正在使用 openpyxl 它打开一些 xl 文件并且无法打开具有相同格式的其他 xl 文件