首页 > 解决方案 > Java NIO 2 中的多线程模型是什么——(Proactor 模式)?

问题描述

我正在尝试使用 Java NIO 2(基于 Proactor 模式)构建一个简单的 Echo 服务。

在最简单的实现中,我们有 4 个主要组件;ProactorInitiator、AcceptConnectionHandler、ReadConnectionHandler 和 WriteConnectionHandler。

以下是我的示例代码。

ProactorInitiator.java

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;

public class ProactorInitiator {
    static int ASYNC_SERVER_PORT = 4333;

    public void initiateProactiveServer(int port)
            throws IOException {

        final AsynchronousServerSocketChannel listener =
                AsynchronousServerSocketChannel.open().bind(
                        new InetSocketAddress(port));
        AcceptCompletionHandler acceptCompletionHandler =
                new AcceptCompletionHandler(listener);

        SessionState state = new SessionState();
        listener.accept(state, acceptCompletionHandler);
        System.out.println("Proactor Initiator Running on "+Thread.currentThread().getName());
    }

    public static void main(String[] args) {
        try {
            System.out.println("Async server listening on port : " +
                    ASYNC_SERVER_PORT);
            new ProactorInitiator().initiateProactiveServer(
                    ASYNC_SERVER_PORT);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // Sleep indefinitely since otherwise the JVM would terminate
        while (true) {
            try {
                Thread.sleep(Long.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

AcceptCompletionHandler.java

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AcceptCompletionHandler
        implements
        CompletionHandler<AsynchronousSocketChannel, SessionState> {

    private AsynchronousServerSocketChannel listener;

    public AcceptCompletionHandler(
            AsynchronousServerSocketChannel listener) {
        this.listener = listener;
    }

    @Override
    public void completed(AsynchronousSocketChannel socketChannel,
                          SessionState sessionState) {

        System.out.println("Accept Handler running on "+Thread.currentThread().getName());
        // accept the next connection
        SessionState newSessionState = new SessionState();
        listener.accept(newSessionState, this);

        // handle this connection
        ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
        ReadCompletionHandler readCompletionHandler =
                new ReadCompletionHandler(socketChannel, inputBuffer);
        socketChannel.read(
                inputBuffer, sessionState, readCompletionHandler);
    }

    @Override
    public void failed(Throwable exc, SessionState sessionState) {
        // Handle connection failure...
    }

}

ReadCompletionHandler.java

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class ReadCompletionHandler implements
        CompletionHandler<Integer, SessionState> {

    private AsynchronousSocketChannel socketChannel;
    private ByteBuffer inputBuffer;

    public ReadCompletionHandler(
            AsynchronousSocketChannel socketChannel,
            ByteBuffer inputBuffer) {
        this.socketChannel = socketChannel;
        this.inputBuffer = inputBuffer;
    }

    @Override
    public void completed(
            Integer bytesRead, SessionState sessionState) {

        System.out.println("Read Handler running on "+Thread.currentThread().getName());

        byte[] buffer = new byte[bytesRead];
        inputBuffer.rewind();
        // Rewind the input buffer to read from the beginning

        inputBuffer.get(buffer);
        String message = new String(buffer);

//        System.out.println("Received message from client : " + message);

//        message = GetRequestParser.getHTTPRequest(message, "200 OK");

        // Echo the message back to client
        WriteCompletionHandler writeCompletionHandler =
                new WriteCompletionHandler(socketChannel);

        ByteBuffer outputBuffer = ByteBuffer.wrap(message.getBytes());

        socketChannel.write(
                outputBuffer, sessionState, writeCompletionHandler);
    }

    @Override
    public void failed(Throwable exc, SessionState attachment) {
        //Handle read failure.....
    }

}

WriteCompletionHandler.java

import java.io.IOException;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class WriteCompletionHandler implements
        CompletionHandler<Integer, SessionState> {

    private AsynchronousSocketChannel socketChannel;

    public WriteCompletionHandler(
            AsynchronousSocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    @Override
    public void completed(
            Integer bytesWritten, SessionState attachment) {
        try {
            System.out.println("Write Handler running on "+Thread.currentThread().getName());
            System.out.println("\n");
            socketChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void failed(Throwable exc, SessionState attachment) {
        // Handle write failure.....
    }

}

会话状态.java

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SessionState {

    private Map<String, String> sessionProps =
            new ConcurrentHashMap<>();

    public String getProperty(String key) {
        return sessionProps.get(key);
    }

    public void setProperty(String key, String value) {
        sessionProps.put(key, value);
    }

}

为了检查线程行为,我将每个处理程序运行的线程打印到“sys.out”。

以下是我得到的不同结果,许多请求一个接一个地发送到服务器。

请求 1

Accept Handler running on Thread-4
Read Handler running on Thread-4
Write Handler running on Thread-4

请求 2

Accept Handler running on Thread-4
Read Handler running on Thread-2
Write Handler running on Thread-2

请求 3

Accept Handler running on Thread-5
Read Handler running on Thread-3
Write Handler running on Thread-3

根据上面的结果,似乎对于不同的请求,服务器使用不同的线程。此外,对于给定的请求,Read Handler 和 Write Handler 都在同一个线程上运行。

有人可以解释这个结果吗?处理程序如何安排在不同的线程上?

标签: javamultithreadingnionio2

解决方案


从每个 Completion 处理程序的结果中可以看出Thread.getCurrentThread().getName(),在 NIO2(proactor 模式)中,未指定不同 Completion 处理程序的线程分配,并且看起来是随机的。因此,最佳实践是不要假设任何线程行为。

为了完整起见,我在下面添加了 NIO 的线程行为。

在 NIO 中,每个活动(无论是套接字接受、读取还是写入)都在单个线程中运行(选择器循环在其中运行。)


推荐阅读