mongodb - 在 Mono 对象上执行 block() 时出现异常我从 ReactiveMongoRepository 对象返回
问题描述
我有一个服务将数据流式传输到第二个服务,该服务接收对象流并将它们保存到我的 MongoDB。在我从流服务获得的 Flux 对象上的 subscribe 函数中,我使用了 ReactiveMongoRepository 接口中的 save 方法。当我尝试使用块函数并获取数据时,出现以下错误:
2019-10-11 13:30:38.559 INFO 19584 --- [localhost:27017] org.mongodb.driver.connection : Opened connection [connectionId{localValue:1, serverValue:25}] to localhost:27017
2019-10-11 13:30:38.566 INFO 19584 --- [localhost:27017] org.mongodb.driver.cluster : Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[4, 0, 1]}, minWireVersion=0, maxWireVersion=7, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=6218300}
2019-10-11 13:30:39.158 INFO 19584 --- [ctor-http-nio-4] quote-monitor-service : onNext(Quote(id=null, ticker=AAPL, price=164.8, instant=2019-10-11T10:30:38.800Z))
2019-10-11 13:30:39.411 INFO 19584 --- [ctor-http-nio-4] quote-monitor-service : cancel()
2019-10-11 13:30:39.429 INFO 19584 --- [ntLoopGroup-2-2] org.mongodb.driver.connection : Opened connection [connectionId{localValue:3, serverValue:26}] to localhost:27017
2019-10-11 13:30:39.437 WARN 19584 --- [ctor-http-nio-4] io.netty.util.ReferenceCountUtil : Failed to release a message: DefaultHttpContent(data: PooledSlicedByteBuf(freed), decoderResult: success)
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138) ~[netty-common-4.1.39.Final.jar:4.1.39.Final]
at
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
at reactor.core.publisher.Mono.block(Mono.java:1494) ~[reactor-core-3.2.12.RELEASE.jar:3.2.12.RELEASE]
at
我的代码:
stockQuoteClient.getQuoteStream()
.log("quote-monitor-service")
.subscribe(quote -> {
Mono<Quote> savedQuote = quoteRepository.save(quote);
System.out.println("I saved a quote! Id: " +savedQuote.block().getId());
});
经过一番挖掘,我设法让它工作,但我不明白为什么它现在工作。新代码:
stockQuoteClient.getQuoteStream()
.log("quote-monitor-service")
.subscribe(quote -> {
Mono<Quote> savedQuote = quoteRepository.insert(quote);
savedQuote.subscribe(result ->
System.out.println("I saved a quote! Id :: " + result.getId()));
});
block() 的定义:订阅此 Mono 并无限期阻塞,直到收到下一个信号。
subscribe() 的定义:订阅此 Mono 并请求无限制的需求。
有人可以帮我理解为什么该块不起作用并且订阅起作用了吗?我在这里想念什么?
解决方案
阻塞是不好的,因为它占用了一个等待响应的线程。在一个只有很少线程可供使用的反应式框架中,这是非常糟糕的,并且被设计成它们都不应该被不必要地阻塞。
这正是响应式框架旨在避免的事情,所以在这种情况下,它只会阻止你这样做:
block()/blockFirst()/blockLast() 是阻塞的,线程 reactor-http-nio-4 不支持
相比之下,您的新代码异步工作。线程不会被阻塞,因为在存储库返回一个值之前实际上什么都不会发生(然后savedQuote.subscribe()
执行您传递给的 lambda,将结果打印到控制台。)
但是,从反应流的角度来看,新代码仍然不是最佳/正常的,因为您正在 subscribe 方法中执行所有逻辑。正常的做法是对我们进行一系列 flatMap/map 调用来转换流中的项目,并doOnNext()
用于副作用(例如打印出一个值):
stockQuoteClient.getQuoteStream()
.log("quote-monitor-service")
.flatMap(quoteRepository::insert)
.doOnNext(result -> System.out.println("I saved a quote! Id :: " + result.getId())))
.subscribe();
如果您正在对反应器/反应流进行大量工作,那么一般来说值得阅读它们。它们对于非阻塞工作非常强大,但与更“标准”的 Java 相比,它们确实需要一种不同的思维方式(和编码方式)。
推荐阅读
- angular6 - Angular 6 AWS Cognito 如何处理 newPasswordRequired
- angular - 如何在 *ngIf 的 else 部分中指向其他组件
- azure-devops - 无法使用命令行 'dotnet .\myapi.dll' 启动进程,ErrorCode = '0x80004005 : 80008096
- python - 从 tkinter Python 导入东西
- java - 并行运行同一个 Oozie 作业多次,每个作业的参数值不同
- java - 找不到元素(Selenium、Java)
- android - 在 Android 中,当我在画布上绘制一个点时,当我移动、缩放或旋转位图时,它不会停留在位图上
- php - 如何使用 mysqli 准备好的语句安全地更新和检索字符串?
- spring - Spring 控制器变量不在视图页面中打印
- reactjs - 如何解决 babel 依赖?