java - Java 中带有 cachedHostConnectionPool 的 Source.queue
问题描述
我正在尝试在 java 中将 source.queue 与 cachedHostConnectionPool 一起使用,但无法获得正确的输出。下面是我的尝试。我总是得到成功的回应,我理解为什么,但不确定什么是正确的方法。
我得到成功响应的原因是因为我已将 Try 定义为 HttpResponse.create() ,它总是给我成功响应,但我怀疑为什么在我执行 queue.offer(Pair.create(httprequest) 时它没有被覆盖,承诺))。有人可以帮我理解我在这里做错了什么。
// Method to return queue
public SourceQueueWithComplete<Object> httpGetRequestWithPool
(Materializer materializer)
{
final Http http = Http.get(system);
final Flow<Pair<HttpRequest, Object>, Pair<Try<HttpResponse>, Object>, HostConnectionPool> flow;
flow = http.cachedHostConnectionPool(ConnectHttp.toHost("host:port"));
final SourceQueueWithComplete<Object> queue= Source.queue(3,OverflowStrategy.dropNew())
.map(d -> Pair.create(HttpRequest.create("contextpath"),d))
.via(flow)
.toMat(Sink.foreach(p -> p.first()), Keep.left())
.run(materializer);
return queue;
}
// Here i am trying to use.
SourceQueueWithComplete<Object> queue = util.httpGetRequestWithPool(materializer);
Source<Object, NotUsed> source = Source.from(ListOfObject);
source.mapAsyncUnordered(3,x-> {
String url = util.getServiceUrl("servicename").append(x.getId).toString();
HttpRequest httpRequest = HttpRequest.create(url);
Try<HttpResponse> promise = Try.apply(()->HttpResponse.create());
queue.offer(Pair.create(httpRequest,promise))
.thenCompose(result -> {
if(result instanceof QueueOfferResult.Enqueued$){
return CompletableFuture.completedFuture(promise)
.thenApply(res ->{
if(res.get().status().intValue()==200){
System.out.println("success");
}
return res;
});
}
else{
return CompletableFuture.completedFuture(HttpResponse.create().withStatus(StatusCodes.SERVICE_UNAVAILABLE));
}
});
return null;
}).run(materializer);
```
解决方案
推荐阅读
- git - git 将部分更改从“待提交”移动到本地更改
- r - 在 data.frame 中传播和排序连接的因子变量
- jquery - 如何根据所选选项包含特定表格
- android - React-Native-无法解析配置 ':classpath' 的所有文件。找不到 com.android.tools.build:gradle:3.4.1
- c++ - 观察者模式是以下场景的最佳设计吗?
- android - 在这种情况下,内存泄漏是如何发生的?
- sql - 在 MS SQL Server 中将字符串转换为日期
- java - Spring boot 2.1.2.RELEASE 在多部分文件请求中出现特殊字符问题
- office365api - PowerApp 无法获取所有用户状态
- servicestack - ServiceStack - 字典内的数组反序列化为字符串