java - 将 Scala Future 转变为 Reactor Flux
问题描述
我正在调用一个返回 a 的 API Future of Scala
,并且我想转换为Reactor Flux
显然没有阻塞并等待未来的响应。我正在尝试使用 EmitterProcessor,当未来成功时它工作正常,但不幸的是,当未来有超时时它不是。
这里的代码
private Flux<D> transformFutureToFlux(Future<T> future) {
EmitterProcessor<D> emitterProcessor = EmitterProcessor.create();
future.onComplete(getOnCompleteFunction(emitterProcessor), defaultExecutionContext());
return Flux.from(emitterProcessor);
}
private <D> OnComplete getOnCompleteFunction(EmitterProcessor<D> emitterProcessor) {
return new OnComplete() {
@Override
public void onComplete(Throwable t, Object result) {
if (t != null) {
processError(t);
} else {
processSucceed(result);
}
}
private void processSucceed(Object result) {
if (result instanceof ConnectorErrorVO) {
publishConnectorErrorException((ConnectorErrorVO) result);
} else {
publishGenericResponse(result);
}
}
private void publishGenericResponse(Object result) {
if (result instanceof List<?>) {
Flux.fromIterable((List<D>) result).subscribe(emitterProcessor);
} else {
Flux.just((D) result).subscribe(emitterProcessor);
}
}
private void publishConnectorErrorException(ConnectorErrorVO result) {
ConnectorErrorVO connectorErrorVO = result;
Flux<D> error = Flux.error(new ConnectorErrorException( String.valueOf(connectorErrorVO.getCode()), connectorErrorVO.getDescription(), connectorErrorVO.getError()));
error.subscribe(emitterProcessor);
}
private void processError(Throwable t) {
ConnectorManagerExecutor.logger.error(null, "Error and recovery from connector manager transaction", t);
if (t instanceof AskTimeoutException) {
Flux.<D>error(new ConnectorErrorException("407", "connector timeout", ConnectorError.TIMEOUT)).subscribe(emitterProcessor);
} else {
Flux.<D>error(new ConnectorErrorException("500", t.getMessage(), ConnectorError.GENERIC)).subscribe(emitterProcessor);
}
}
};
我正在尝试做的是正确的?,还有更好的方法吗?
问候
解决方案
如果您返回 a Future
,则意味着只有 1 个返回,而不是一系列/返回值流。您可以根据需要转换为Flux
,但为什么不改用Mono
?
另外,如果您使用的是 scala,请尝试使用reactor-scala-extensions SMono.fromFuture
以 SMonoTest为例。
import scala.concurrent.ExecutionContext.Implicits.global
StepVerifier.create(SMono.fromFuture(Future[Long] {
randomValue
}))
.expectNext(randomValue)
.verifyComplete()
推荐阅读
- arduino-uno - Attiny85 与 ArduinoUno 用于 I2c 通信
- c# - 如何使用 C# 在 Unity 中使用 JsonUtility 从给定的 JSON 格式获取数据?
- javascript - 如何将多个 Ajax Post 调用异步到同一个 URL
- django - 如何根据登录的用户来匹配页面上的不同内容?
- php - 带有子选择的 Doctrine 查询构建器内部联接
- symfony4 - 在 API 平台中保存相关对象
- asp.net-core - .NET Core 操作筛选器不是属性类
- c# - 有没有办法快速检查,字符串与正则表达式匹配的百分比是多少?
- laravel - Laravel google oauth 2回调返回403禁止
- solver - SimCity (BuildIt) 经济的子集可以用 Minizinc 表示吗?