mongodb - 如何正确地将数据保存到 MongoDb 使用 CompletableFuture 在无阻塞堆栈中抛出 Spring-data
问题描述
这个问题可以总结一下:如何正确地将数据保存到 MongoDb 使用 CompletableFuture(即 Spring Webflux + reactive.ReactiveCrudRepository + java.util.concurrent)在无阻塞堆栈中抛出 Spring-data?
在过去的三天里,我一直在努力学习和搜索并阅读几个教程,以便在有人想要使用 CompletableFuture 时找到推荐的方法或至少是“北路径”来持久化数据。我可以成功地达到下面的代码,但我不确定我是否在做一些奇怪的事情。
基本上,我想使用 CompletableFuture 因为我想链接期货。比方说,首先保存在 MongoDb 中,如果做得好,然后“thenAcceptAsync”,最后“thenCombine”它们。
好吧, ReactiveCrudRepository.save 返回 Mono<> 我必须订阅才能有效地保存它。此外, Mono<>.subscribe() 返回 dispose ,我知道我可以使用它来取消它,假设线程花费太长时间,因为例如 MongoDb 已退出或任何其他异常。到目前为止,一切都很好。
我不清楚的是,如果我没有搞乱使用保存异步方法中阻塞的数据的想法。由于我的目的是留给“未来”的解决方案,我是否在下面的保存方法中被阻塞并完全失去了保存在不同线程中并获得未来结果的好处?
将代码正确保存到 MongoDb 但我不清楚它是否真的是“无阻塞”方法。请注意,completableFuture.get() 已被注释,因为我不需要它来有效地保存我的数据
@Async("taskExecutor")
public void transferirDisposableReturnedSupplyAsync(Extrato e) throws InterruptedException, ExecutionException {
CompletableFuture<Disposable> completableFuture = CompletableFuture
.supplyAsync(() -> extratoRepository.save(e).subscribe());
//completableFuture.get(); unnecessary since subscribe() above already saved it
}
如果相关:存储库:
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import com.noblockingcase.demo.model.Extrato;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.data.domain.Pageable;
public interface ExtratoRepository extends ReactiveCrudRepository<Extrato, String> {
@Query("{ id: { $exists: true }}")
Flux<Extrato> retrieveAllExtratosPaged(final Pageable page);
}
异步配置:
import java.util.concurrent.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
// The @EnableAsync annotation enables Spring’s ability to run @Async methods in a background thread pool.
// The bean taskExecutor helps to customize the thread executor such as configuring number of threads for an application, queue limit size and so on.
// Spring will specifically look for this bean when the server is started.
// If this bean is not defined, Spring will create SimpleAsyncTaskExecutor by default.
@Configuration
@EnableAsync
public class AsyncConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
LOGGER.debug("Creating Async Task Executor");
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("ExtratoThread-");
executor.initialize();
return executor;
}
}
*** 添加
import { Injectable, NgZone } from '@angular/core';
import { Observable } from 'rxjs';
import { Extrato } from './extrato';
@Injectable({
providedIn: "root"
})
export class SseService {
extratos: Extrato[] = [];
constructor(private _zone: NgZone) { }
getServerSentEvent(url: string): Observable<any> {
this.extratos = [];
return Observable.create(observer => {
const eventSource = this.getEventSource(url);
eventSource.onmessage = event => {
this._zone.run(() => {
let json = JSON.parse(event.data);
this.extratos.push(new Extrato(json['id'], json['description'], json['value'], json['status']));
observer.next(this.extratos);
});
};
eventSource.onerror = (error) => {
if (eventSource.readyState === 0) {
console.log('The stream has been closed by the server.');
eventSource.close();
observer.complete();
} else {
observer.error('EventSource error: ' + error);
}
}
});
}
private getEventSource(url: string): EventSource {
return new EventSource(url);
}
}
解决方案
推荐阅读
- asp.net-mvc - 本地 IIS 服务器上 MVC 中的 Hangfire 包响应“网站拒绝显示此网页”
- javascript - React - 解析逗号分隔的用户输入
- java - 如何根据数据是否正确或正确使用 JFrame 输出消息?
- python - Pandas - 将 DataFrame 值除以 MultiIndex DataFrame 中的系列
- c++ - 函数已经有一个字符串变量的主体和重新定义
- java - WARN - 获取ImportedKeys 失败游标先前已被释放且不可用
- amazon-web-services - 将 Prometheus 运算符与 k8s 的 DB 卷一起使用
- javascript - 请问这是什么意思:if (a && b == c){};?
- azure - Azure Troubleshooting HTTP 500: Validate user in memberhsip provider is not called
- excel - How to deal with non-contiguous range when using COUNTIF in VBA UDF?