java - 如何验证 servlet 处理是否真的在做非阻塞 io?
问题描述
我正在尝试使用非阻塞 io 接收 http 请求,然后使用非阻塞 io 向另一台服务器发出另一个 http 请求,并返回一些响应,这是我的 servlet 的代码:
package learn;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.core.Response;
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;
@WebServlet(urlPatterns = {"/async"}, asyncSupported = true)
public class AsyncProcessing extends HttpServlet {
private static final long serialVersionUID = -535924906221872329L;
public CompletableFuture<String> readRequestAsync(final HttpServletRequest req) {
final CompletableFuture<String> request = new CompletableFuture<>();
final StringBuilder httpRequestData = new StringBuilder();
try (ServletInputStream inputStream = req.getInputStream()){
inputStream.setReadListener(new ReadListener() {
final int BUFFER_SIZE = 4*1024;
final byte buffer[] = new byte[BUFFER_SIZE];
@Override
public void onError(Throwable t) {
request.completeExceptionally(t);
}
@Override
public void onDataAvailable() {
if(inputStream.isFinished()) return;
System.out.println("----------------------------------------");
System.out.println("onDataAvailable: " + Thread.currentThread().getName());
try {
while(inputStream.isReady()) {
int length = inputStream.read(buffer);
httpRequestData.append(new String(buffer, 0, length));
}
} catch (IOException ex) {
request.completeExceptionally(ex);
}
}
@Override
public void onAllDataRead() throws IOException {
try {
request.complete(httpRequestData.toString());
}
catch(Exception e) {
request.completeExceptionally(e);
}
}
});
} catch (IOException e) {
request.completeExceptionally(e);
}
return request;
}
private Client createAsyncHttpClient() {
ResteasyClientBuilder restEasyClientBuilder = (ResteasyClientBuilder)ClientBuilder.newBuilder();
return restEasyClientBuilder.useAsyncHttpEngine().connectTimeout(640, TimeUnit.SECONDS).build();
}
public CompletableFuture<Response> process(String httpRequest){
System.out.println("----------------------------------------");
System.out.println("process: " + Thread.currentThread());
CompletableFuture<Response> futureResponse = new CompletableFuture<>();
Client client = createAsyncHttpClient();
client.target("http://localhost:3000").request().async().get(new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
System.out.println("----------------------------------------");
System.out.println("completed: " + Thread.currentThread());
futureResponse.complete(response);
}
@Override
public void failed(Throwable throwable) {
System.out.println(throwable);
futureResponse.completeExceptionally(throwable);
}
});
return futureResponse;
}
public CompletableFuture<Integer> outputResponseAsync(Response httpResponseData, HttpServletResponse resp){
System.out.println("----------------------------------------");
System.out.println("outputResponseAsync: " + Thread.currentThread().getName());
CompletableFuture<Integer> total = new CompletableFuture<>();
try (ServletOutputStream outputStream = resp.getOutputStream()){
outputStream.setWriteListener(new WriteListener() {
@Override
public void onWritePossible() throws IOException {
System.out.println("----------------------------------------");
System.out.println("onWritePossible: " + Thread.currentThread().getName());
outputStream.print(httpResponseData.getStatus());
total.complete(httpResponseData.getLength());
}
@Override
public void onError(Throwable t) {
System.out.println(t);
total.completeExceptionally(t);
}
});
} catch (IOException e) {
System.out.println(e);
total.completeExceptionally(e);
}
return total;
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
System.out.println("----------------------------------------");
System.out.println("doGet: " + Thread.currentThread().getName());
final AsyncContext asyncContext = req.startAsync();
readRequestAsync(req)
.thenCompose(this::process)
.thenCompose(httpResponseData -> outputResponseAsync(httpResponseData, resp))
.thenAccept(a -> asyncContext.complete());
}
}
http://localhost:3000 的服务器是一个用节点编写的 http 服务器,它在 27 秒后返回一个响应,我想向节点服务器发出一个请求,在处理这个请求时我想再做一个对 servlet 的 http 请求,以查看是否正在使用相同的线程。目前我正在尝试使用 payara 5.194 来执行此操作,但即使我将两个线程池设置为一个线程,应用服务器似乎也会创建另一个线程。所以,我想从你的知识中知道这个 servlet 是否真的在做非阻塞 io 并且在任何时候都没有阻塞,如果我能做一些实验来确保这一点,那就太棒了。我认为重要的是要指出类 ServletInputStream 是 InputStream 的子类,所以我真的不知道这是否是非阻塞 io。谢谢你。
解决方案
推荐阅读
- visual-studio-code - 如何按名称在扩展程序中获取 vscode 终端?
- javascript - 不带引号的 JSON 属性值
- php - 如果未在数据库中设置值,则不运行
- ios - GMT 中的当前日期与 DateFormatter
- flutter - 使用 BottomSheet 时未找到 Scaffold 小部件
- flutter - 如何在颤动中创建水平列表
- mysql - 如何修复 #1366 - 不正确的字符串值:'\xE6\x10\x00\x00\x01\x01...'
- .net-core - Prism 可以使用 .NET Core 的内置依赖注入吗?
- python - importlib.import_module 忽略在 __init__.py 中进行的重新导出
- macos-catalina - 如何在 macOS Catalina 上安装 gmp4