java - 当 HttpClient 订阅时,Reactor Netty 没有得到 HttpServer 响应,仅当 HttpClient 阻塞时
问题描述
使用 github 中的示例 HttpClient/HttpServer 示例,当 HttpClient 订阅而不是阻止时,我试图打印来自示例 HttpServer 的响应。这是使用的示例 HttpServer:
public class Server {
public static void main(String[] args) {
DisposableServer server =
HttpServer.create()
.host("10.0.0.19")
.port(61005)
.route(routes ->
routes.get("/hello",
(request, response) -> response.sendString(Mono.just("Hello World!")))
.post("/echo",
(request, response) -> response.sendString(Mono.just("Hello World!"))))
.bindNow();
server.onDispose()
.block();
}
}
以下是 HttpClient 和 HttpServer 使用的 Maven 依赖项:
<dependencies>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.7.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
如果 HttpClient 阻塞,请求和响应可以正常工作,如下阻塞代码:
public class Client {
private static final Logger log = Logger.getLogger(Client.class.getSimpleName());
public static void main(String[] args) {
String responseStr = HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
.port(61005)
.post()
.uri("/echo")
.send(ByteBufFlux.fromString(Mono.just("Hello")))
.responseContent()
.aggregate()
.asString()
.block();
System.out.println(responseStr);
}
}
但是如果HttpClient订阅了onSuccess、onError和onCompletion回调,则没有响应,即onSuccess、onError和onCompletion都没有执行:
public class Client {
private static final Logger log = Logger.getLogger(Client.class.getSimpleName());
public static void main(String[] args) {
Consumer<String> onSuccess = (String response) -> {
log.info("response in onSuccess: "+response);
};
Consumer<Throwable> onError = (Throwable ex) -> {
ex.getMessage();
};
Runnable onCompletion = () -> {
System.out.println("Message Completed");
};
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
.port(61005)
.post()
.uri("/echo")
.send(ByteBufFlux.fromString(Mono.just("Hello")))
.responseContent()
.aggregate()
.asString()
.subscribe(onSuccess, onError, onCompletion);
}
}
我找不到使用 subscribe() 的示例。在上面使用 subscribe() 的 HttpClient 示例中,HttpServer 似乎没有返回响应。任何煽动为什么会发生这种情况都会有所帮助。
解决方案
在 Reactor Netty 示例中,.block
用于保持main
线程处于活动状态,因为我们不希望线程退出。当您使用时,.subscribe
您需要另一种机制来保持main
线程处于活动状态并防止其退出。在您的示例中发生的是main
线程退出并且您看不到结果。
您可以使用例如CountDownLatch
public class Client {
private static final Logger log = Logger.getLogger(Client.class.getSimpleName());
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
Consumer<String> onSuccess = (String response) -> {
log.info("response in onSuccess: "+response);
};
Consumer<Throwable> onError = (Throwable ex) -> {
ex.getMessage();
latch.countDown();
};
Runnable onCompletion = () -> {
System.out.println("Message Completed");
latch.countDown();
};
HttpClient.create()
.tcpConfiguration(tcpClient -> tcpClient.host("10.0.0.19"))
.port(61005)
.post()
.uri("/echo")
.send(ByteBufFlux.fromString(Mono.just("Hello")))
.responseContent()
.aggregate()
.asString()
.subscribe(onSuccess, onError, onCompletion);
latch.await();
}
}
推荐阅读
- javascript - 数组移除、“未定义”和 JavaScript
- apache-spark - 使用 Kafka 进行结构化流式处理的依赖项是什么?
- javascript - 如何为 JavaScript 函数创建文本字段?
- javascript - axios ajax,发出ajax请求时显示加载
- assembly - 如何使用 NVIDIA 的 PTX 代码在屏幕上绘制图形?
- java - 路径参数中的 URL
- c# - 我如何编译(运行)只是 C# 中的一部分代码?
- mysql - 估计列大小
- python - BeautifulSoup 无法解析网站
- powershell - 如何在 SharePoint Online 中更新托管元数据文件属性