首页 > 解决方案 > 如何验证 servlet 处理是否真的在做非阻塞 io?

问题描述

我正在尝试使用非阻塞 io 接收 http 请求,然后使用非阻塞 io 向另一台服务器发出另一个 http 请求,并返回一些响应,这是我的 servlet 的代码:

package learn;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.core.Response;

import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder;

@WebServlet(urlPatterns = {"/async"}, asyncSupported = true)
public class AsyncProcessing extends HttpServlet {

    private static final long serialVersionUID = -535924906221872329L;
    
    public CompletableFuture<String> readRequestAsync(final HttpServletRequest req) {       
        
        final CompletableFuture<String> request = new CompletableFuture<>();
        final StringBuilder httpRequestData = new StringBuilder();
        try (ServletInputStream inputStream = req.getInputStream()){                        
            inputStream.setReadListener(new ReadListener() {
                final int BUFFER_SIZE = 4*1024;
                final byte buffer[] = new byte[BUFFER_SIZE];
                
                @Override
                public void onError(Throwable t) {
                    request.completeExceptionally(t);
                }
                
                @Override
                public void onDataAvailable() {
                    if(inputStream.isFinished()) return;
                    System.out.println("----------------------------------------");
                    System.out.println("onDataAvailable: " + Thread.currentThread().getName());
                    try {
                        while(inputStream.isReady()) {
                          int length = inputStream.read(buffer);                      
                          httpRequestData.append(new String(buffer, 0, length));
                       }
                    } catch (IOException ex) { 
                        request.completeExceptionally(ex);
                    }
                }
                
                @Override
                public void onAllDataRead() throws IOException {
                    try {
                        request.complete(httpRequestData.toString());
                    }
                    catch(Exception e) {
                        request.completeExceptionally(e);
                    }
                }
            });
        } catch (IOException e) {
            request.completeExceptionally(e);
        }                               
        
        return request;
    }
    
    
    
    private Client createAsyncHttpClient() {
        ResteasyClientBuilder restEasyClientBuilder = (ResteasyClientBuilder)ClientBuilder.newBuilder();

        return restEasyClientBuilder.useAsyncHttpEngine().connectTimeout(640, TimeUnit.SECONDS).build();
    }
    
    public CompletableFuture<Response>  process(String httpRequest){        
        System.out.println("----------------------------------------");
        System.out.println("process: " + Thread.currentThread());
        
        CompletableFuture<Response> futureResponse = new CompletableFuture<>();
        
        Client client = createAsyncHttpClient();
        client.target("http://localhost:3000").request().async().get(new InvocationCallback<Response>() {
            @Override
            public void completed(Response response) {
                System.out.println("----------------------------------------");
                System.out.println("completed: " + Thread.currentThread());
                futureResponse.complete(response);
            }

            @Override
            public void failed(Throwable throwable) {
                System.out.println(throwable);
                futureResponse.completeExceptionally(throwable);
            }
        });
        
        return futureResponse;
    }
    
    public CompletableFuture<Integer> outputResponseAsync(Response httpResponseData, HttpServletResponse resp){
        System.out.println("----------------------------------------");
        System.out.println("outputResponseAsync: " + Thread.currentThread().getName());
        
        CompletableFuture<Integer> total = new CompletableFuture<>();
        try (ServletOutputStream outputStream = resp.getOutputStream()){            
            outputStream.setWriteListener(new WriteListener() {             
                
                @Override
                public void onWritePossible() throws IOException {
                    System.out.println("----------------------------------------");
                    System.out.println("onWritePossible: " + Thread.currentThread().getName());                                 
                    
                    outputStream.print(httpResponseData.getStatus());
                    total.complete(httpResponseData.getLength());
                }
                
                @Override
                public void onError(Throwable t) {
                    System.out.println(t);
                    total.completeExceptionally(t);
                }
            });
        } catch (IOException e) {
            System.out.println(e);
            total.completeExceptionally(e);
        }
        
        return total;
    }
        
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        System.out.println("----------------------------------------");
        System.out.println("doGet: " + Thread.currentThread().getName());
        final AsyncContext asyncContext = req.startAsync();
        readRequestAsync(req)
            .thenCompose(this::process)
            .thenCompose(httpResponseData -> outputResponseAsync(httpResponseData, resp))       
            .thenAccept(a -> asyncContext.complete());
    }
}

http://localhost:3000 的服务器是一个用节点编写的 http 服务器,它在 27 秒后返回一个响应,我想向节点服务器发出一个请求,在处理这个请求时我想再做一个对 servlet 的 http 请求,以查看是否正在使用相同的线程。目前我正在尝试使用 payara 5.194 来执行此操作,但即使我将两个线程池设置为一个线程,应用服务器似乎也会创建另一个线程。所以,我想从你的知识中知道这个 servlet 是否真的在做非阻塞 io 并且在任何时候都没有阻塞,如果我能做一些实验来确保这一点,那就太棒了。我认为重要的是要指出类 ServletInputStream 是 InputStream 的子类,所以我真的不知道这是否是非阻塞 io。谢谢你。

标签: javahttpservletsnonblockingjakarta-ee

解决方案


推荐阅读