首页 > 解决方案 > 如何在 Undertow 的非阻塞处理程序中执行阻塞代码?

问题描述

另一个问题中所述,使用 Undertow 时,所有处理都应在专用 Worker 线程池中完成,如下所示:

public class Start {

  public static void main(String[] args) {
    Undertow server = Undertow.builder()
        .addListener(8080, "localhost")
        .setHandler(new HttpHandler() {
          public void handleRequest(HttpServerExchange exchange)
              throws Exception {
            if (exchange.isInIoThread()) {
              exchange.dispatch(this);
              return;
            }
            exchange.getResponseHeaders()
                    .put(Headers.CONTENT_TYPE, "text/plain");
            exchange.getResponseSender()
                    .send("Hello World");
          }
        })
        .build();
    server.start();
  }
}

我知道这BlockingHandler可用于明确告诉 Undertow 将请求安排在专用线程池上以阻塞请求。我们可以通过将 包装HttpHandler在 的实例中来调整上面的示例BlockingHandler,如下所示:

        .setHandler(new BlockingHandler(new HttpHandler() {

这适用于我们知道总是阻塞的呼叫。

但是,如果某些代码大部分时间都是非阻塞的,但有时需要阻塞调用,那么如何将阻塞调用变成非阻塞调用呢?例如,如果请求的值存在于缓存中,则以下代码不会阻塞(它只是从 some 中获取Map<>),但如果不是,则必须从数据库中获取。

public class Start {

  public static void main(String[] args) {
    Undertow server = Undertow.builder()
        .addListener(8080, "localhost")
        .setHandler(new HttpHandler() {
          public void handleRequest(HttpServerExchange exchange)
              throws Exception {
            if (exchange.isInIoThread()) {
              exchange.dispatch(this);
              return;
            }
            if (valueIsPresentInCache(exchange)) {
              return valueFromCache;  // non-blocking
            } else {
              return fetchValueFromDatabase(); // blocking!!!
            }
          }
        })
        .build();
    server.start();
  }
}

根据文档,有一种方法HttpServerExchange.startBlocking(),但根据 JavaDoc,除非您确实需要使用输入流,否则此调用仍然是阻塞的。

调用此方法会将交换置于阻塞模式,并创建一个 BlockingHttpExchange 对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了目前阻塞和非阻塞模式之间没有主要区别

如何将这个阻塞调用变成非阻塞调用?

标签: javaasynchronousundertow

解决方案


正确的方法是在 IO 线程中实际执行逻辑,如果它是非阻塞的。否则,将请求委托给专用线程,如下所示:

public class Example {

  public static void main(String[] args) {
    Undertow server = Undertow.builder()
        .addListener(8080, "localhost")
        .setHandler(new HttpHandler() {
          public void handleRequest(HttpServerExchange exchange)
              throws Exception {

            if (valueIsPresentInCache(exchange)) {
              getValueFromCache();  // non-blocking, can be done from IO thread           
            } else {

              if (exchange.isInIoThread()) {
                exchange.dispatch(this);
                // we return immediately, otherwise this request will be
                // handled both in IO thread and a Worker thread, throwing
                // an exception
                return;
              }

              fetchValueFromDatabase(); // blocking!!!

            }
          }
        })
        .build();
    server.start();
  }
}

推荐阅读