首页 > 解决方案 > 尽管绑定/连接地址匹配,为什么我会间歇性地收到 PortUnreachableException?

问题描述

我有一个小的概念验证,我试图通过不同的端口路由 UDP 流量,而任何一方都不知道端口转换。如果我禁用 connect() 调用并只执行“即发即弃”数据报,我就能够使这项工作正常进行。此外,即使使用 connect() 调用,我的代码也能正常工作,但它似乎会选择特定流量并抛出 PortUnreachableException。

实际的服务器/客户端(我正在路由的流量)可能会双向发送流量。其中一些流量可以正常通过,但有些则不能,并导致无法访问端口错误。我已经能够重现这种有问题的服务器/客户端行为,并且我已经使用名为 run_buggySend() 的简单方法将该实现融入到我的下面的 PoC 代码中,您将在下面看到。

我已经向后和横向查看了代码,但看不到抛出异常的原因。我知道大多数人都使用 bind() 或 connect() 但不是两者都使用,但我的用例需要两者,因为我正在路由的某些协议需要非常特定的端口用于双向流量。此外,虽然我可以摆脱 connect() 调用以使其工作,但我喜欢缓存路由信息以提高性能的想法,并且不明白为什么它不应该兼容。然后没有 bind(),从另一个方向到硬编码端口的流量永远不会到达我的 DatagramChannel。

我还想知道这是否会遇到一些错误的 Java 代码,因为端口不可达异常甚至没有从更相关/正确的 DatagramChannel 抛出。如果我将触发数据报发送到 3000,则 13001 通道会抛出错误。同样,如果我将其发送到 13001,则 3000 通道会引发错误。

也就是说,我将通过下面的最小复制代码。感谢您提供任何提示或帮助!

import java.io.IOException;
import java.math.BigInteger;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ByteChannel;
import java.nio.charset.StandardCharsets;

public class ByteChannelUDPBridge implements Runnable
{
    private final ByteBuffer buff = ByteBuffer.allocateDirect(4096);

    private final ByteChannelUDP channelA;
    private final ByteChannelUDP channelB;

    private final int portBase = 3000;
    private final int portOffset = 10000;

    public ByteChannelUDPBridge() throws IOException
    {
        InetAddress loopback = InetAddress.getLoopbackAddress();

        channelA = new ByteChannelUDP(
            new InetSocketAddress(loopback, portBase),                      //    bind=3000
            new InetSocketAddress(loopback, portBase + 1));                 // connect=3001
        channelB = new ByteChannelUDP(
            new InetSocketAddress(loopback, portOffset + portBase + 1),     //    bind=13001
            new InetSocketAddress(loopback, portOffset + portBase));        // connect=13000
    }

    public void transfer(ByteChannel chanIn, ByteChannel chanOut) throws IOException
    {
        try
        {
            chanIn.read(buff);

            if (buff.position() > 0)
            {
                System.out.print(chanIn + " recieved " + buff.position() + " bytes: " + buff.position() + " bytes: ");

                buff.flip();
                logToHex(StandardCharsets.UTF_8.decode(buff));
                buff.rewind();
                chanOut.write(buff);
                buff.clear();
            }
        }
        catch (PortUnreachableException ex)
        {
            System.err.println(chanIn + ": " + ex);
        }
    }

    public void run()
    {
        try
        {
            while (true)
            {
                transfer(channelA, channelB); // 3000  >> B:13001 -> C:13000
                transfer(channelB, channelA); // 13001 >> B:3000  -> C:3001
                Thread.sleep(1);
            }
        }
        catch (IOException | InterruptedException ex) { ex.printStackTrace(); }
        finally
        {
            try
            {
                channelA.close();
                channelB.close();
            }
            catch (IOException dontCare) { dontCare.printStackTrace(); }
        }
    }

    public void run_buggySend()
    {
        try
        {
            DatagramSocket sock = new DatagramSocket(3001); // 13000
            InetSocketAddress recip = new InetSocketAddress(InetAddress.getLoopbackAddress(), 3000); // 13001
            byte[] buffer = new byte[]{1, 2, 3};

            while (true)
            {
                sock.send(new DatagramPacket(buffer, buffer.length, recip));
                Thread.sleep(15000);
            }
        }
        catch (IOException | InterruptedException ex)
        {
            ex.printStackTrace();
        }
    }

    public void logToHex(CharBuffer arg) { System.out.format("%x\n", new BigInteger(1, arg.toString().getBytes())); }

    public static void main(String[] args) throws Exception
    {
        ByteChannelUDPBridge listener = new ByteChannelUDPBridge();
        Thread t1 = new Thread(listener);
        t1.start();

        Thread t2 = new Thread(listener::run_buggySend);
        t2.start();
    }
}

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.DatagramChannel;

public class ByteChannelUDP implements ByteChannel
{
    private final DatagramChannel channel;

    private final SocketAddress bind, connect;

    public ByteChannelUDP(InetSocketAddress addrBind, InetSocketAddress addrConnect) throws IOException
    {
        bind = addrBind;
        connect = addrConnect;
        channel = DatagramChannel.open();
        channel.configureBlocking(false);
        channel.bind(bind);
        channel.connect(connect);
    }

    @Override
    public int read(ByteBuffer dst) throws IOException
    {
        //return channel.read(dst); // this also causes the unreachable exception
        int before = dst.remaining();
        SocketAddress rec = channel.receive(dst);
        return dst.remaining() - before;
    }

    @Override
    public int write(ByteBuffer src) throws IOException
    {
        //return channel.write(src); // not used unless i can get read() to work
        return channel.send(src, connect);
    }

    @Override
    public boolean isOpen() { return channel.isOpen(); }

    @Override
    public void close() throws IOException { channel.close(); }

    @Override
    public String toString()
    {
        return bind.toString() + " <> " + connect.toString();
    }
}

标签: javasocketsudpniodatagram

解决方案


偶然发现了问题和解决方案。当recieve() /read()方法抛出 时PortUnreachableException,实际上是send()/ write()触发了错误。

一侧的接收器显然没有正确配置并且足够容易修复,因此可以理解中间写入将是错误出现的地方。

然而,由于一些难以理解的原因,Java 认为在方法中提醒调用者注意无法传递的数据报更有意义read(),尽管事实上它是write()失败的。此外,这种行为似乎也没有记录在案。Java 应该在下一次写入时或在单独的“错误轮询”方法中抛出异常,而不是来自与写入操作失败几乎完全无关的读取操作。


推荐阅读