java - Vertx HTTPClient with CompletableFuture block the callback Thread
问题描述
I am facing a very weird problem.
I am working on Vert.x and from handler I am calling REST APIs using HttpClientRequest
of Vert.x. Now I am having a CompletableFuture
which I am completing in the response handler of the HttpClientRequest
. Later, I am using CompletableFuture.get()
. But whenever get()
method is called, the main thread is blocked (as expected), but it remains blocked forever. I am not seeing the callback happen on my response Handler and it is stuck for forever.
Here is code:
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonObject;
import java.util.concurrent.CompletableFuture;
import io.vertx.core.http.HttpClient;
CompletableFuture<JsonObject> comp = new CompletableFuture<JsonObject>();
HttpClient httpClient = new HttpClient(); //This object initialized and set the endpoit, port and domain name.
HttpClientRequest request = httpClient.request(HttpMethod.POST, requestURI, response -> {
response.bodyHandler(body -> {
//do some process
comp.complete(new JsonObject(body);
});
}).exceptionHandler(e -> {
//log the error
comp.completeExceptionally(e);
});
request.end();
//after some process
comp.get(); // here main thread is stuck forever.
My API gives 200 response, I saw in it Wireshark and also If I do comp.thenAccept()
the callback is executed and it gives my result.
Why is this happening and what is the solution?
Note: I know that it is not recommendation to use Completable.get()
method but in my use-case, I have to use it.
Here is sample code which is giving me issue:
package io.vertx.starter;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.http.*;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import java.util.concurrent.CompletableFuture;
public class SampleVerticle extends AbstractVerticle {
public void start ( Future startFuture ) throws Exception {
Future<Void> future = Future.future ();
HttpServer server = vertx.createHttpServer ();
Router router = Router.router (vertx);
router.get ("/sample").handler (this::sampeHandler);
router.get ("/testcompletableblocking").handler (this::testCompBlocking);
router.get ("/testcompletablenonblocking").handler (this::testCompNonBlocking);
server.requestHandler (router::accept) // <5>
.listen (8080, ar -> { // <6>
if (ar.succeeded ()) {
System.out.println ("Server started");
future.complete ();
} else {
System.out.println ("Server is not started");
future.fail (ar.cause ());
}
});
}
private void sampeHandler ( RoutingContext context ) {
try {
Thread.sleep (1000);
} catch (Exception e) {
}
String response = "Hello...";
context.response ().setStatusCode (200).putHeader ("content-type", "text/html").end (response);
}
private void testCompBlocking ( RoutingContext context ) {
System.out.println ("Calling testCompBlocking....");
HttpClientOptions clientOptions = new HttpClientOptions ().setDefaultHost ("localhost").setDefaultPort (8080).setSsl (false).setKeepAlive (true);
HttpClient client = vertx.createHttpClient (clientOptions);
String requestURI = "/sample";
CompletableFuture<String> comp = new CompletableFuture<> ();
HttpClientRequest request = client.request (HttpMethod.GET, requestURI, response -> {
response.bodyHandler (body -> {
String kmsResponse = new String (body.getBytes ());
System.out.println ("kmsResponse-" + kmsResponse);
comp.complete (kmsResponse);
});
}).exceptionHandler (e -> {
e.printStackTrace ();
comp.completeExceptionally (e);
});
request.end ();
String result = "Not Success";
try {
result = comp.get ();
} catch (Exception e) {
System.out.println ("Exception in getting from Completable..." + e.getMessage ());
e.printStackTrace ();
}
context.response ().setStatusCode (200);
context.response ().putHeader ("content-type", "text/html");
context.response ().end (result);
System.out.println ("end testCompBlocking....");
}
private void testCompNonBlocking ( RoutingContext context ) {
System.out.println ("Calling testCompNonBlocking....");
HttpClientOptions clientOptions = new HttpClientOptions ().setDefaultHost ("localhost").setDefaultPort (8080).setKeepAlive (false);
HttpClient client = vertx.createHttpClient (clientOptions);
String requestURI = "/sample";
CompletableFuture<String> comp = new CompletableFuture<> ();
HttpClientRequest request = client.request (HttpMethod.GET, requestURI, response -> {
response.bodyHandler (body -> {
String kmsResponse = new String (body.getBytes ());
System.out.println ("kmsResponse-" + kmsResponse);
comp.complete (kmsResponse);
});
}).exceptionHandler (e -> {
e.printStackTrace ();
comp.completeExceptionally (e);
});
request.end ();
String result = "Not Blocking, please see result at Console";
try {
comp.thenAccept (apiResult -> System.out.println ("apiResult from CompletableFuture - " + apiResult));
} catch (Exception e) {
System.out.println ("Exception in getting from Completable..." + e.getMessage ());
e.printStackTrace ();
}
context.response ().setStatusCode (200);
context.response ().putHeader ("content-type", "text/html");
context.response ().end (result);
System.out.println ("end testCompNonBlocking....");
}
}
Call localhost:8080/testcompletableblocking
, response is not sent and current thread is blocked forever.
解决方案
The problem with your implementation is that it violates The Golden Rule - Don’t Block the Event Loop. You should not call a blocking operation like CompletableFuture.get()
on the event loop. Similarly, sampleHandler()
should not call Thread.sleep()
on the event loop either, but that's a lesser problem.
The consequence is that your event loop is now blocked… so your /sample
request cannot be processed anymore. And since the request is not processed, you CompletableFuture
remains uncompleted… deadlock.
There are two possible solutions to this problem:
Use
CompletableFuture
as designed, relying on chained calls instead ofget()
, though it does not enforce Vert.x's threading model. So for example:comp.whenComplete((result, e) -> { System.out.println("Got sample response"); if (e != null) { context.response().setStatusCode(500) .end(e.getMessage()); } else { context.response().setStatusCode(200) .putHeader("content-type", "text/html") .end(result); } System.out.println("end testCompBlocking...."); });
Use Vert.x facilities for running blocking code. This shouln't be necessary with
CompletableFuture
but other API's might require it. So for example:context.vertx().<String>executeBlocking(future -> { String result = "Not Success"; try { result = comp.get(); } catch (Exception e) { System.out.println("Exception in getting from Completable..." + e.getMessage()); e.printStackTrace(); } future.complete(result); }, false, result -> { context.response().setStatusCode(200); context.response().putHeader("content-type", "text/html"); context.response().end(result.result()); System.out.println("end testCompBlocking...."); });
推荐阅读
- javascript - 如何在 PeerJs peer.call(id, stream, [options]) 函数中设置元数据?
- flutter - 如何在 Flutter 中手动计算布局
- sql-server - 在 SQL Server 中制作摘要报告
- python - 使用 python 或 node.js 将平面文件从 SFTP 上传到 S3
- google-sheets - 谷歌表格公式在创建新表格之前不起作用
- python-3.x - 如何合并单元格而不丢失 libreoffice-calc 中的数据?
- django - 为什么我的 websocket 在 Django Channels App 中不断断开连接?
- ios - 本机:未安装 InAppBrowser 或您正在浏览器上运行。回退到window.open。在 iOS 上
- c# - 如何使用 C#/.NET 使用多个“Where”表达式并将它们与 AND 和 OR 链接在一起?
- regex - 在 vb.net 中使用正则表达式