java - 设置客户端套接字和服务器套接字侦听器 (Java)
问题描述
我正在尝试在 java 中建立对等连接。
我正在尝试设置我的程序以侦听传入连接,同时向外连接到不同的客户端。
如何实例化我的套接字连接:socketConnection
作为连接到程序的任何内容。理想情况下是这样的:
if(socketConnection.isConnectedToExternalPeer()){
//do stuff
} else if (socketConnection.hasAnIncomingConnection()){
//do stuff
}
在咨询了@L.Spillner 的解决方案后,我将以下代码放在了下面,唯一的问题是我不太明白如何接受连接,这从我尝试设置向上流程序在等待对等方的回复时进入循环:
public class Client implements AutoCloseable {
// Any other ThreadPool can be used as well
private ExecutorService cachedExecutor = null;
private ExecutorService singleThreadExecutor = null;
// port this client shall listen on
private int port = 0;
// Name of the client
private String name = null;
// indicates that a connection is ongoing
private boolean isConnected = false;
// the socket the Client is currently connected with
private Socket activeConenctionSocket = null;
// The ServerSocket which will be listening for any incoming connection
private ServerSocket listener = null;
// The socket which has been accepted by the ServerSocket
private Future<Socket> acceptedSocket;
private ObjectInputStream inputStream = null;
private ObjectOutputStream outputStream = null;
private BloomChain bloomChain = null;
/**
* @param port Port number by which this client shall be accessed.
* @param name The name of this Client.
*/
public Client( int port, String name )
{
this.port = port;
this.name = name;
this.bloomChain = new BloomChain();
this.cachedExecutor = Executors.newCachedThreadPool();
this.singleThreadExecutor = Executors.newSingleThreadExecutor();
this.listener = createListeningSocket();
startListening();
}
private ServerSocket createListeningSocket()
{
ServerSocket temp = null;
try
{
temp = new ServerSocket( this.port );
}
catch ( IOException e )
{
e.printStackTrace();
}
return temp;
}
private void startListening()
{
if ( !this.isConnected )
{
this.listener = createListeningSocket();
this.acceptedSocket = this.cachedExecutor.submit( new ServAccept( this.listener ) );
}
}
/**
* Attempts to connect to any other socket specified by the hostname and the targetport.
*
* @param host The hostname of the target to connect.
* @param targetport The port of the target.
*/
public void connect( String host, int targetport )
{
try
{ System.out.println(host);
System.out.println(targetport);
this.activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );
setUpStreams(this.activeConenctionSocket);
this.isConnected = true;
System.out.println(InetAddress.getAllByName(host));
}
catch ( IOException e )
{
e.printStackTrace();
}
try
{
this.listener.close();
}
catch ( IOException e )
{
// this will almost certainly throw an exception but it is intended.
}
}
public void setUpStreams(Socket socket) throws IOException {
this.outputStream = new ObjectOutputStream(socket.getOutputStream());
this.outputStream.flush();
this.inputStream = new ObjectInputStream(socket.getInputStream());
}
@Override
public void close() throws Exception
{
// close logic (can be rather nasty)
}
public void sendMessage(String message){
if(bloomChain.size()<1){
bloomChain.addBlock(new Block(message, "0"));
} else {
bloomChain.addBlock(new Block(message, bloomChain.get(bloomChain.size()-1).getPreviousHash()));
}
try {
this.outputStream.writeObject(bloomChain);
this.outputStream.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public String mineMessage(){
final String[] receivedMessage = {null};
final Block tempBlock = this.bloomChain.get(this.bloomChain.size()-1);
this.singleThreadExecutor.submit(()->{
tempBlock.mineBlock(bloomChain.getDifficulty());
receivedMessage[0] = tempBlock.getData();
});
return receivedMessage[0];
}
public String dataListener(){
if(isConnected) {
try {
BloomChain tempChain = (BloomChain) this.inputStream.readObject();
if (tempChain.isChainValid()) {
this.bloomChain = tempChain;
return mineMessage();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return null;
}
public ServerSocket getListener() {
return this.listener;
}
public boolean isConnected(){
return isConnected;
}
public ObjectOutputStream getOutputStream(){
return this.outputStream;
}
public ObjectInputStream getInputStream(){
return this.inputStream;
}
}
编辑2:
我试图等待acceptedSocket.get()
在单独的线程中返回一个套接字,如下所示:
new Thread(()->{
setupStreams(this.acceptedSocket.get());
//try-catch blocks omitted
}).start();
这成功等待acceptedSocket
返回连接的套接字但是当我尝试连接到另一个本地运行的客户端时,我收到以下错误:java.net.SocketException: socket closed
解决方案
好吧,经过一番修修补补,我终于想出了一个巧妙的小解决方案:
我们希望能够同时监听和连接,所以我们需要一个ServerSocket
并发出一个ServerSocket#accept
调用来接受传入的连接。
然而,这个方法阻塞了线程,所以为了能够继续我们的程序,我们必须将此调用外包给另一个线程,幸运的是,默认的 Java API 确实提供了一种简单的方法来做到这一点。
以下代码示例尚未完成,但提供了核心功能:
客户端.java:
public class Client
implements AutoCloseable
{
// Any other ThreadPool can be used as well
private ExecutorService es = Executors.newCachedThreadPool();
// port this client shall listen on
private int port;
// Name of the client
private String name;
// indicates that a connection is ongoing
private boolean isConnected = false;
// the socket the Client is currently connected with
private Socket activeConenctionSocket;
// The ServerSocket which will be listening for any incoming connection
private ServerSocket listener;
// The socket which has been accepted by the ServerSocket
private Future<Socket> acceptedSocket;
/**
* @param port Port number by which this client shall be accessed.
* @param name The name of this Client.
*/
public Client( int port, String name )
{
this.port = port;
this.name = name;
this.listener = createListeningSocket();
startListening();
}
private ServerSocket createListeningSocket()
{
ServerSocket temp = null;
try
{
temp = new ServerSocket( port );
}
catch ( IOException e )
{
e.printStackTrace();
}
return temp;
}
private void startListening()
{
if ( !isConnected )
{
listener = createListeningSocket();
acceptedSocket = es.submit( new ServAccept( listener ) );
}
}
/**
* Attempts to connect to any other socket specified by the hostname and the targetport.
*
* @param host The hostname of the target to connect.
* @param targetport The port of the target.
*/
public void connect( String host, int targetport )
{
isConnected = true;
try
{
activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );
}
catch ( IOException e )
{
e.printStackTrace();
}
try
{
listener.close();
}
catch ( IOException e )
{
// this will almost certainly throw an exception but it is intended.
}
}
@Override
public void close() throws Exception
{
// close logic (can be rather nasty)
}
}
让我们逐步了解如何实例化一个新的 Client 对象:
- 当我们实例化我们的对象时,我们会创建一个新的 ServerSocket
- 我们通过创建一个
Callable<V>
对象的新线程来开始侦听,我已将其命名ServAccept
为示例目的。 - 现在我们有一个
Future<T>
对象,如果任何连接被接受,它将包含一个套接字。
该方法的一个积极的副作用startListening()
是,您可以将其公开并在连接断开时再次调用它。
该conenct(...)
方法的工作方式几乎与您的setupConnection()
方法相同,但略有不同。仍在另一个线程中侦听的 ServerSocket 将关闭。这样做的原因是,没有其他方法可以退出accept()
另一个线程卡在的方法。
最后一件事(您必须弄清楚)是何时检查 Future 对象是否已经完成。
ServAccept.java
public class ServAccept
implements Callable<Socket>
{
ServerSocket serv;
public ServAccept( ServerSocket sock )
{
this.serv = sock;
}
@Override
public Socket call() throws Exception
{
return serv.accept();
}
}
编辑:
事实上,我不得不承认我的方法可能不是一个非常全面的方法,所以我决定改变一些东西。这一次,我决定不使用未来对象,而是使用事件/自定义 EventListener,它只是坐在那里并监听连接以接收。我测试了连接功能,它工作得很好,但我还没有实现一个解决方案来确定客户端是否真的连接到对等点。我只是确保客户端一次只能保持一个连接。
变化:
服务器接受.java
import java.io.IOException;
import java.net.ServerSocket;
public class ServAccept implements Runnable
{
private ServerSocket serv;
private ConnectionReceivedListener listener;
public ServAccept( ServerSocket sock,ConnectionReceivedListener con )
{
this.serv = sock;
this.listener = con;
}
@Override
public void run()
{
try
{
listener.onConnectionReceived( new ConnectionReceivedEvent( serv.accept() ) );
} catch (IOException e)
{
// planned exception here.
}
}
}
不再实现Callable<V>
,但Runnable
该更改的唯一原因是我们不再等待任何返回,因为我们将使用侦听器和一些多汁的事件。无论如何,为了做到这一点,我们需要创建一个监听器并将其传递给这个对象。但首先我们应该看一下监听器/事件结构:
ConnectionReceivedListener.java
import java.util.EventListener;
@FunctionalInterface
public interface ConnectionReceivedListener extends EventListener
{
public void onConnectionReceived(ConnectionReceivedEvent event);
}
只是我们构建一些匿名类或 lambda 表达式的简单接口。没什么好想的。它甚至不需要扩展EventListener
接口,但我喜欢这样做来提醒我类的目的是什么。
ConnectionReceivedEvent.java
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
public class ConnectionReceivedEvent
{
private Socket accepted;
public ConnectionReceivedEvent( Socket sock )
{
this.accepted = sock;
}
public Socket getSocket()
{
return accepted;
}
public OutputStream getOutput() throws IOException
{
return accepted.getOutputStream();
}
public InputStream getInput() throws IOException
{
return accepted.getInputStream();
}
public int getPort()
{
return accepted.getPort();
}
}
也没什么好想的,只是传递一个 Socket 作为构造函数参数并定义一些 getter,其中大多数不会在本示例中使用。
但是我们现在如何使用它呢?
private void startListening()
{
if (!isConnected)
{
closeIfNotNull();
listener = createListeningSocket();
es.execute( new ServAccept( listener, event -> setAccepted( event.getSocket() ) ) );
}
}
private void setAccepted( Socket socket )
{
if (!isConnected)
{
this.activeConenctionSocket = socket;
setUpStreams( socket );
} else
{
sendError( socket );
}
}
我们仍然使用我们的ExecutorService
并使用该类创建一个新线程ServAccept
。但是,由于我们不希望有任何回报,所以我从 更改ExecutorService#submit
为ExecutorService#execute
(只是意见和品味的问题)。
但是ServAccept
现在需要两个参数。要使用的 ServerSocket 和 Listener。幸运的是,我们可以使用匿名类,并且由于我们的 Listener 仅具有一种方法,我们甚至可以使用 lambda 表达式。event -> setAccepted(event.getSocket())
.
作为对您第二次编辑的回答:我犯了一个逻辑错误。不是该ServerSocket#close
方法在中断调用时会引发异常ServerSocket#accept
,而是accept()
调用本身会引发异常。换句话说,你得到的异常是故意的,我错误地压制了另一个。
推荐阅读
- r - 在循环中计算行和的条件相关性
- python-3.x - 如何使大型 Python 包模块化?
- javascript - 使用 Lodash/Javascript 的过滤器获取过滤后的数据
- flutter - 出现错误:SPAN_EXCLUSIVE_EXCLUSIVE 跨度的长度不能为零
- c# - 如何使用 ParseExact 从字符串中解析日期?
- function - 数据洞察 - 自动聚合功能
- integromat - 如何在其中写入带有空格的参数名称?自定义 Integromat 应用程序
- c# - C#中的Helix Toolkit显示层点云
- c - 与 C 中的 fgets 和 getchar 相关的问题
- c - 尝试在自定义函数中传递一个int数组,在自定义函数中获取不到数组的内容