首页 > 解决方案 > 如何唤醒 OP_WRITE 事件的选择器(java NIO)?

问题描述

我想为 UDP 通道创建一个选择器,以便能够在同一流上读取和写入数据。但是我不知道如何将另一个流中的数据写入之前在选择器中注册的套接字。

我尝试直接使用套接字,但选择器不再阻塞。

我还找到了一种使用 System.in 的方法,但它只适用于控制台。

快速数据:字节 []

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;

public class UdpChannelService extends Thread {
    private static final int KB = 1024;
    private static final int BUFFER_SIZE = 128 * KB;
    public static final String IP_HOST = "localhost";
    public static final int IP_PORT = 4040;

    private DatagramChannel channel;
    private Selector selector;
    private SelectionKey key;
    private ByteBuffer bfreader;
    private ByteBuffer bfwriter;
    private boolean running;
    private boolean autoconnect;
    private SocketAddress serverAddress;

    public UdpChannelService() throws IOException {
        running = false;
        autoconnect = false;
        bfreader = ByteBuffer.allocate(BUFFER_SIZE);
        bfwriter = ByteBuffer.allocate(BUFFER_SIZE);
        selector = Selector.open();
        serverAddress = new InetSocketAddress(IP_HOST, IP_PORT);
        channel = createUdpChannel(selector);
    }

    public static DatagramChannel createUdpChannel(Selector selector) throws IOException {
        DatagramChannel channel = DatagramChannel.open();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
        return channel;
    }


    @Override
    public void run() {
        running = true;

        while (running){
            bfreader.clear();
            try {
                //Doi su kien
                selector.select();
                key = selector.selectedKeys().iterator().next();
                selector.selectedKeys().clear();

                if(!key.isValid())
                    continue;

                if(key.isReadable()){
                    //do du lieu
                    SocketAddress address = channel.receive(bfreader);

                    //chuyen doi du lieu
                    bfreader.flip();
                    QuickData data = QuickData.extract(bfreader);

                    System.out.println(data);

                }

                if(key.isWritable()){
                    bfwriter.flip();
                    channel.send(bfwriter, serverAddress);
                }

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void channelWrite(QuickData data) throws UnsupportedEncodingException {
        bfwriter.put(data.toByteBuffer());
        synchronized (selector){
            if (selector.selectedKeys().isEmpty())
                selector.selectedKeys().add(key);
            selector.wakeup();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        UdpChannelService service = new UdpChannelService();

        service.start();

        while (true){
            service.channelWrite(QuickData.create("111","222",new byte[100]));
            Thread.sleep(1000);
        }
    }
}

标签: javasockets

解决方案


推荐阅读