首页 > 解决方案 > 设置客户端套接字和服务器套接字侦听器 (Java)

问题描述

我正在尝试在 java 中建立对等连接。

我正在尝试设置我的程序以侦听传入连接,同时向外连接到不同的客户端。

如何实例化我的套接字连接:socketConnection作为连接到程序的任何内容。理想情况下是这样的:

if(socketConnection.isConnectedToExternalPeer()){
//do stuff
} else if (socketConnection.hasAnIncomingConnection()){
//do stuff
}

在咨询了@L.Spillner 的解决方案后,我将以下代码放在了下面,唯一的问题是我不太明白如何接受连接,这从我尝试设置向上流程序在等待对等方的回复时进入循环:

public class Client implements AutoCloseable {

    // Any other ThreadPool can be used as well
    private ExecutorService cachedExecutor = null;
    private ExecutorService singleThreadExecutor = null;
    // port this client shall listen on
    private int port = 0;

    // Name of the client
    private String name = null;

    // indicates that a connection is ongoing
    private boolean isConnected = false;

    // the socket the Client is currently connected with
    private Socket activeConenctionSocket = null;

    // The ServerSocket which will be listening for any incoming connection
    private ServerSocket listener = null;

    // The socket which has been accepted by the ServerSocket
    private Future<Socket> acceptedSocket;


    private ObjectInputStream inputStream = null;


    private ObjectOutputStream outputStream = null;

    private BloomChain bloomChain = null;


    /**
     * @param port Port number by which this client shall be accessed.
     * @param name The name of this Client.
     */
    public Client( int port, String name )
    {
        this.port = port;
        this.name = name;
        this.bloomChain = new BloomChain();

        this.cachedExecutor = Executors.newCachedThreadPool();
        this.singleThreadExecutor = Executors.newSingleThreadExecutor();

        this.listener = createListeningSocket();
        startListening();
    }

    private ServerSocket createListeningSocket()
    {

        ServerSocket temp = null;
        try
        {
            temp = new ServerSocket( this.port );

        }
        catch ( IOException e )
        {
            e.printStackTrace();
        }

        return temp;
    }

    private void startListening()
    {
        if ( !this.isConnected )
        {
            this.listener = createListeningSocket();
            this.acceptedSocket = this.cachedExecutor.submit( new ServAccept( this.listener ) );

        }
    }



    /**
     * Attempts to connect to any other socket specified by the hostname and the targetport.
     *
     * @param host The hostname of the target to connect.
     * @param targetport The port of the target.
     */
    public void connect( String host, int targetport )
    {

        try
        {   System.out.println(host);
            System.out.println(targetport);
            this.activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );

            setUpStreams(this.activeConenctionSocket);

            this.isConnected = true;

            System.out.println(InetAddress.getAllByName(host));
        }
        catch ( IOException e )
        {
            e.printStackTrace();
        }

        try
        {
            this.listener.close();
        }
        catch ( IOException e )
        {
            // this will almost certainly throw an exception but it is intended.
        }
    }

    public void setUpStreams(Socket socket) throws IOException {
        this.outputStream = new ObjectOutputStream(socket.getOutputStream());
        this.outputStream.flush();
        this.inputStream = new ObjectInputStream(socket.getInputStream());
    }

    @Override
    public void close() throws Exception
    {
        // close logic (can be rather nasty)
    }

    public void sendMessage(String message){

        if(bloomChain.size()<1){
            bloomChain.addBlock(new Block(message, "0"));
        } else {
            bloomChain.addBlock(new Block(message, bloomChain.get(bloomChain.size()-1).getPreviousHash()));
        }
        try {

            this.outputStream.writeObject(bloomChain);
            this.outputStream.flush();

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public String mineMessage(){
        final String[] receivedMessage = {null};
        final Block tempBlock = this.bloomChain.get(this.bloomChain.size()-1);

        this.singleThreadExecutor.submit(()->{
            tempBlock.mineBlock(bloomChain.getDifficulty());
            receivedMessage[0] = tempBlock.getData();

        });

        return receivedMessage[0];
    }

    public String dataListener(){
        if(isConnected) {
            try {
                BloomChain tempChain = (BloomChain) this.inputStream.readObject();
                if (tempChain.isChainValid()) {
                    this.bloomChain = tempChain;
                    return mineMessage();
                }

            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }

        return null;
    }

    public ServerSocket getListener() {
        return this.listener;
    }

    public boolean isConnected(){
        return isConnected;
    }

    public ObjectOutputStream getOutputStream(){
        return this.outputStream;
    }

    public ObjectInputStream getInputStream(){
        return this.inputStream;
    }



}

编辑2: 我试图等待acceptedSocket.get()在单独的线程中返回一个套接字,如下所示:

new Thread(()->{
    setupStreams(this.acceptedSocket.get());
    //try-catch blocks omitted 
}).start();

这成功等待acceptedSocket返回连接的套接字但是当我尝试连接到另一个本地运行的客户端时,我收到以下错误:java.net.SocketException: socket closed

标签: javasocketsp2p

解决方案


好吧,经过一番修修补补,我终于想出了一个巧妙的小解决方案:

我们希望能够同时监听和连接,所以我们需要一个ServerSocket并发出一个ServerSocket#accept调用来接受传入的连接。
然而,这个方法阻塞了线程,所以为了能够继续我们的程序,我们必须将此调用外包给另一个线程,幸运的是,默认的 Java API 确实提供了一种简单的方法来做到这一点。

以下代码示例尚未完成,但提供了核心功能:

客户端.java:

public class Client
    implements AutoCloseable
{
  // Any other ThreadPool can be used as well
  private ExecutorService es = Executors.newCachedThreadPool();

  // port this client shall listen on
  private int port;

  // Name of the client
  private String name;

  // indicates that a connection is ongoing
  private boolean isConnected = false;

  // the socket the Client is currently connected with
  private Socket activeConenctionSocket;

  // The ServerSocket which will be listening for any incoming connection
  private ServerSocket listener;

  // The socket which has been accepted by the ServerSocket
  private Future<Socket> acceptedSocket;

  /**
   * @param port Port number by which this client shall be accessed.
   * @param name The name of this Client.
   */
  public Client( int port, String name )
  {
    this.port = port;
    this.name = name;
    this.listener = createListeningSocket();
    startListening();
  }

  private ServerSocket createListeningSocket()
  {

    ServerSocket temp = null;
    try
    {
      temp = new ServerSocket( port );
    }
    catch ( IOException e )
    {
      e.printStackTrace();
    }

    return temp;
  }

  private void startListening()
  {
    if ( !isConnected )
    {
      listener = createListeningSocket();
      acceptedSocket = es.submit( new ServAccept( listener ) );
    }
  }

  /**
   * Attempts to connect to any other socket specified by the hostname and the targetport.
   * 
   * @param host The hostname of the target to connect.
   * @param targetport The port of the target.
   */
  public void connect( String host, int targetport )
  {
    isConnected = true;
    try
    {
      activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );
    }
    catch ( IOException e )
    {
      e.printStackTrace();
    }

    try
    {
      listener.close();
    }
    catch ( IOException e )
    {
      // this will almost certainly throw an exception but it is intended.
    }
  }

  @Override
  public void close() throws Exception
  {
    // close logic (can be rather nasty)
  }
}

让我们逐步了解如何实例化一个新的 Client 对象:

  1. 当我们实例化我们的对象时,我们会创建一个新的 ServerSocket
  2. 我们通过创建一个Callable<V>对象的新线程来开始侦听,我已将其命名ServAccept为示例目的。
  3. 现在我们有一个Future<T>对象,如果任何连接被接受,它将包含一个套接字。

该方法的一个积极的副作用startListening()是,您可以将其公开并在连接断开时再次调用它。

conenct(...)方法的工作方式几乎与您的setupConnection()方法相同,但略有不同。仍在另一个线程中侦听的 ServerSocket 将关闭。这样做的原因是,没有其他方法可以退出accept()另一个线程卡在的方法。

最后一件事(您必须弄清楚)是何时检查 Future 对象是否已经完成。

ServAccept.java

public class ServAccept
    implements Callable<Socket>
{
  ServerSocket serv;

  public ServAccept( ServerSocket sock )
  {
    this.serv = sock;
  }

  @Override
  public Socket call() throws Exception
  {
    return serv.accept();
  }

}

编辑:

事实上,我不得不承认我的方法可能不是一个非常全面的方法,所以我决定改变一些东西。这一次,我决定不使用未来对象,而是使用事件/自定义 EventListener,它只是坐在那里并监听连接以接收。我测试了连接功能,它工作得很好,但我还没有实现一个解决方案来确定客户端是否真的连接到对等点。我只是确保客户端一次只能保持一个连接。

变化:

服务器接受.java

import java.io.IOException;
import java.net.ServerSocket;

public class ServAccept implements Runnable
{
    private ServerSocket serv;
    private ConnectionReceivedListener listener;

    public ServAccept( ServerSocket sock,ConnectionReceivedListener con )
    {
        this.serv = sock;
        this.listener = con;
    }

    @Override
    public void run()
    {
        try
        {
            listener.onConnectionReceived( new ConnectionReceivedEvent( serv.accept() ) );
        } catch (IOException e)
        {
            // planned exception here.
        }
    }
}

不再实现Callable<V>,但Runnable该更改的唯一原因是我们不再等待任何返回,因为我们将使用侦听器和一些多汁的事件。无论如何,为了做到这一点,我们需要创建一个监听器并将其传递给这个对象。但首先我们应该看一下监听器/事件结构:

ConnectionReceivedListener.java

import java.util.EventListener;

@FunctionalInterface
public interface ConnectionReceivedListener extends EventListener
{
    public void onConnectionReceived(ConnectionReceivedEvent event);
}

只是我们构建一些匿名类或 lambda 表达式的简单接口。没什么好想的。它甚至不需要扩展EventListener接口,但我喜欢这样做来提醒我类的目的是什么。

ConnectionReceivedEvent.java

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class ConnectionReceivedEvent
{
    private Socket accepted;

    public ConnectionReceivedEvent( Socket sock )
    {
        this.accepted = sock;
    }

    public Socket getSocket()
    {
        return accepted;
    }

    public OutputStream getOutput() throws IOException
    {
        return accepted.getOutputStream();
    }

    public InputStream getInput() throws IOException
    {
        return accepted.getInputStream();
    }

    public int getPort()
    {
        return accepted.getPort();
    }
}

也没什么好想的,只是传递一个 Socket 作为构造函数参数并定义一些 getter,其中大多数不会在本示例中使用。

但是我们现在如何使用它呢?

private void startListening()
{
    if (!isConnected)
    {
        closeIfNotNull();
        listener = createListeningSocket();
        es.execute( new ServAccept( listener, event -> setAccepted( event.getSocket() ) ) );
    }
}

private void setAccepted( Socket socket )
{
    if (!isConnected)
    {
        this.activeConenctionSocket = socket;
        setUpStreams( socket );
    } else
    {
        sendError( socket );
    }
}

我们仍然使用我们的ExecutorService并使用该类创建一个新线程ServAccept。但是,由于我们不希望有任何回报,所以我从 更改ExecutorService#submitExecutorService#execute(只是意见和品味的问题)。
但是ServAccept现在需要两个参数。要使用的 ServerSocket 和 Listener。幸运的是,我们可以使用匿名类,并且由于我们的 Listener 仅具有一种方法,我们甚至可以使用 lambda 表达式。event -> setAccepted(event.getSocket()).

作为对您第二次编辑的回答:我犯了一个逻辑错误。不是该ServerSocket#close方法在中断调用时会引发异常ServerSocket#accept,而是accept()调用本身会引发异常。换句话说,你得到的异常是故意的,我错误地压制了另一个。


推荐阅读