首页 > 解决方案 > 当多个客户端尝试连接时,多线程服务器给出“无法访问已处理的对象”错误

问题描述

我有一个名为 server.cs 和 client.cs 的多线程服务器,该程序的目标如下:

每个客户端程序都会创建到服务器的连接。服务器将回复欢迎消息(字符串)。客户端会向服务器发送一个 json 格式的字符串。服务器将以相同的格式回复,但添加了有关机密和 END 状态的信息。然后,客户端将向服务器发送消息以停止通信并关闭连接。当所有客户端都通信后,最后会有一个 id=-1 的客户端,通知服务器停止。当服务器收到来自结束客户端的消息(id = -1)时,它必须打印所有收集的信息和通信客户端的数量。

这最大的部分已经完成。当我SequentialSimulation()在我的 client.cs 中使用该方法并运行 client.cs 程序的许多实例时,服务器工作得很好,并且按照上面的描述进行。但是,当我ConcurrentSimulation()在 client.cs 中使用该方法并且只运行 clients.cs 的一个实例时,它会崩溃并给我以下错误:

Unhandled exception. System.AggregateException: One or more errors occurred. (Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.)
 ---> System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'System.Net.Sockets.Socket'.
   at System.Net.Sockets.Socket.get_RemoteEndPoint()
   at SocketClient.Client.endCommunication() in /Users/test/Documents/client/Program.cs:line 143
   at SocketClient.ClientsSimulator.SequentialSimulation() in /Users/test/Documents/client/Program.cs:line 174
   at SocketClient.ClientsSimulator.<ConcurrentSimulation>b__5_0() in /Users/test/Documents/client/Program.cs:line 191
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.<>c.<.cctor>b__274_0(Object obj)
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
   at System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at SocketClient.ClientsSimulator.ConcurrentSimulation() in /Users/test/Documents/client/Program.cs:line 197
   at SocketClient.Program.Main(String[] args) in /Users/test/Documents/client/Program.cs:line 219

的目标ConcurrentSimulation()是让多个客户端连接到服务器而不是运行 client.cs 程序的实例(服务器应该能够同时处理 200 个客户端,运行 200 个 client.cs 实例是很多工作) .

帮我解决这个问题,你会在下面找到代码。

服务器.cs:

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading;

namespace SocketServer
{
    public class ClientInfo
    {
        public string studentnr { get; set; }
        public string classname { get; set; }
        public int clientid { get; set; }
        public string teamname { get; set; }
        public string ip { get; set; }
        public string secret { get; set; }
        public string status { get; set; }
    }

    public class Message
    {
        public const string welcome = "WELCOME";
        public const string stopCommunication = "COMC-STOP";
        public const string statusEnd = "STAT-STOP";
        public const string secret = "SECRET";
    }

    public class SequentialServer
    {
        public Socket listener;
        public IPEndPoint localEndPoint;
        //Definig the ip address
        public IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        // Defining the portnumber
        public readonly int portNumber = 11111;

        public String results = "";
        public LinkedList<ClientInfo> clients = new LinkedList<ClientInfo>();

        private Boolean stopCond = false;
        private int processingTime = 1000;
        private int listeningQueueSize = 5;

        public void prepareServer()
        {
            byte[] bytes = new Byte[1024];
            String data = null;
            int numByte = 0;
            string replyMsg = "";
            bool stop;

            try
            {
                Console.WriteLine("[Server] is ready to start ...");
                // Establish the local endpoint
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                Console.Out.WriteLine("[Server] A socket is established ...");
                // associate a network address to the Server Socket. All clients must know this address
                listener.Bind(localEndPoint);
                // This is a non-blocking listen with max number of pending requests
                listener.Listen(listeningQueueSize);
                while (true)
                {
                    Console.WriteLine("Waiting connection ... ");
                    // Suspend while waiting for incoming connection 
                    Socket connection = listener.Accept();
                    this.sendReply(connection, Message.welcome);

                    stop = false;
                    while (!stop)
                    {
                        numByte = connection.Receive(bytes);
                        data = Encoding.ASCII.GetString(bytes, 0, numByte);
                        replyMsg = processMessage(data);
                        if (replyMsg.Equals(Message.stopCommunication))
                        {
                            stop = true;
                            break;
                        }
                        else
                            this.sendReply(connection, replyMsg);
                    }

                }

            }
            catch (Exception e)
            {
                Console.Out.WriteLine(e.Message);
            }
        }
        public void handleClient(Socket con)
        {
        }
        public string processMessage(String msg)
        {
            Thread.Sleep(processingTime);
            Console.WriteLine("[Server] received from the client -> {0} ", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.stopCommunication:
                        replyMsg = Message.stopCommunication;
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        clients.AddLast(c);
                        if (c.clientid == -1)
                        {
                            stopCond = true;
                            exportResults();
                        }
                        c.secret = c.studentnr + Message.secret;
                        c.status = Message.statusEnd;
                        replyMsg = JsonSerializer.Serialize<ClientInfo>(c);
                        break;
                }
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Server] processMessage {0}", e.Message);
            }

            return replyMsg;
        }
        public void sendReply(Socket connection, string msg)
        {
            byte[] encodedMsg = Encoding.ASCII.GetBytes(msg);
            connection.Send(encodedMsg);
        }
        public void exportResults()
        {
            if (stopCond)
            {
                this.printClients();
            }
        }
        public void printClients()
        {
            string delimiter = " , ";
            Console.Out.WriteLine("[Server] This is the list of clients communicated");
            foreach (ClientInfo c in clients)
            {
                Console.WriteLine(c.classname + delimiter + c.studentnr + delimiter + c.clientid.ToString());
            }
            Console.Out.WriteLine("[Server] Number of handled clients: {0}", clients.Count);

            clients.Clear();
            stopCond = false;

        }
    }



    public class ConcurrentServer
    {
        public  static Socket listener;
        public static IPEndPoint localEndPoint;
        public static List<HostInfo> hosts;
        public static IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        public static readonly int portNumber = 11111;
        public static LinkedList<ClientInfo> clients = new LinkedList<ClientInfo>();

        private static Boolean stopCond = false;
        private static int processingTime = 1000;
        private static int listeningQueueSize = 5;


        public void prepareServer()
        {


            try
            {
                Console.WriteLine("[ConcurrentServer] is ready to start ...");


                listener = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                listener.Bind(localEndPoint);
                // A list of clients// to do: change the name if the list
                hosts = new List<HostInfo>();
                Thread listenThread = new Thread(ListenThread);

                listenThread.Start();

                Console.WriteLine("Waiting connection ... ");

            }
            catch (Exception e)
            {
            }


        }

        public static  void ClientInfoThread(object clientSocket)
        {
            // A thread that handles the incoming and outgoing messages.
            Socket cSocket = (Socket) clientSocket;

            byte[] bytes = new Byte[1024];
            String data = null;
            int numByte = 0;
            string replyMsg = "";
            bool stop;

            try

            {
            while (true)
            {
                // Suspend while waiting for incoming connection 
                //Socket connection = listener.Accept();
                Console.WriteLine("Waiting connection ... ");
                sendReply(cSocket, Message.welcome);

                stop = false;
                while (!stop)
                {
                    numByte = cSocket.Receive(bytes);
                    data = Encoding.ASCII.GetString(bytes, 0, numByte);
                    replyMsg = processMessage(data);
                    if (replyMsg.Equals(Message.stopCommunication))
                    {
                        stop = true;
                        break;
                    }
                    else
                        sendReply(cSocket, replyMsg);
                }

            }


            }catch(Exception e){
                //catches exception
            }

        }
        // a thread that adds each client socket  that is trying to connect, in the list.
        public static void ListenThread()
        {
            for( ; ; )
            {
            listener.Listen(0);
            hosts.Add(new HostInfo(listener.Accept()));  
            }
        }
        public  static string processMessage(String msg)
        {
            Thread.Sleep(processingTime);
            Console.WriteLine("[ConcurrentServer] received from the client -> {0} ", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.stopCommunication:
                        replyMsg = Message.stopCommunication;
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        clients.AddLast(c);
                        if (c.clientid == -1)
                        {
                            stopCond = true;
                            exportResults();
                        }
                        c.secret = c.studentnr + Message.secret;
                        c.status = Message.statusEnd;
                        replyMsg = JsonSerializer.Serialize<ClientInfo>(c);
                        break;
                }
            }
            catch (Exception e)
            {
            }

            return replyMsg;
        }
        public static void sendReply(Socket connection, string msg)
        {
            byte[] encodedMsg = Encoding.ASCII.GetBytes(msg);
            connection.Send(encodedMsg);
        }
        public static void exportResults()
        {
            if (stopCond)
            {
                printClients();
            }
        }
        public static void printClients()
        {
            string delimiter = " , ";
            Console.Out.WriteLine("[ConcurrentServer] This is the list of clients communicated");
            foreach (ClientInfo c in clients)
            {
                Console.WriteLine(c.classname + delimiter + c.studentnr + delimiter + c.clientid.ToString());
            }
            Console.Out.WriteLine("[ConcurrentServer] Number of handled clients: {0}", clients.Count);

            clients.Clear();
            stopCond = false;

        }





    }

    public class HostInfo{

        public Socket hostSocket;
        public Thread hostThread;
        public string id;
        public HostInfo()
        {
            id = Guid.NewGuid().ToString();
            hostThread= new Thread(ConcurrentServer.ClientInfoThread);
            hostThread.Start(hostSocket);

        }
         public HostInfo(Socket hostSocket)
        {
            this.hostSocket = hostSocket;
            id = Guid.NewGuid().ToString();
            hostThread= new Thread(ConcurrentServer.ClientInfoThread);
            hostThread.Start(hostSocket);

        }

    }



    public class ServerSimulator
    {
        public static void sequentialRun()
        {
            Console.Out.WriteLine("[Server] A sample server, sequential version ...");
            SequentialServer server = new SequentialServer();
            server.prepareServer();
        }
        public static void concurrentRun()

        {
            Console.Out.WriteLine("[ConcurrentServer] A sample server, concurrent version ...");
            ConcurrentServer server = new ConcurrentServer();
            server.prepareServer();
        }
    }
    class Program
    {
        // Main Method 
        static void Main(string[] args)
        {
            Console.Clear();
           //ServerSimulator.sequentialRun();
            // todo: uncomment this when the solution is ready.
            ServerSimulator.concurrentRun();
        }

    }
}

客户端.cs:

using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using System.Threading;

namespace SocketClient
{
    public class ClientInfo
    {
        public string studentnr { get; set; }
        public string classname { get; set; }
        public int clientid { get; set; }
        public string teamname { get; set; }
        public string ip { get; set; }
        public string secret { get; set; }
        public string status { get; set; }
    }

    public class Message
    {
        public const string welcome = "WELCOME";
        public const string stopCommunication = "COMC-STOP";
        public const string statusEnd = "STAT-STOP";
        public const string secret = "SECRET";
    }

    public class Client
    {
        public Socket clientSocket;
        private ClientInfo info;
        public IPEndPoint localEndPoint;
        public IPAddress ipAddress = IPAddress.Parse("127.0.0.1");
        public readonly int portNumber = 11111;
        public readonly int minWaitingTime = 50, maxWaitingTime = 100;
        public int waitingTime = 0;
        string baseStdNumber = "0700";

        private String msgToSend;

        public Client(bool finishing, int n)
        {
            waitingTime = new Random().Next(minWaitingTime, maxWaitingTime);
            info = new ClientInfo();
            info.classname = " INF2X ";
            info.studentnr = this.baseStdNumber + n.ToString();
            info.ip = "127.0.0.1";
            info.clientid = finishing ? -1 : 1;
        }

        public string getClientInfo()
        {
            return JsonSerializer.Serialize<ClientInfo>(info);
        }
        public void prepareClient()
        {
            try
            {
                // Establish the remote endpoint for the socket.
                localEndPoint = new IPEndPoint(ipAddress, portNumber);
                // Creation TCP/IP Socket using  
                clientSocket = new Socket(ipAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Client] Preparation failed: {0}", e.Message);
            }
        }
        public string processMessage(string msg)
        {
            Console.WriteLine("[Client] from Server -> {0}", msg);
            string replyMsg = "";

            try
            {
                switch (msg)
                {
                    case Message.welcome:
                        replyMsg = this.getClientInfo();
                        break;
                    default:
                        ClientInfo c = JsonSerializer.Deserialize<ClientInfo>(msg.ToString());
                        if (c.status == Message.statusEnd)
                        {
                            replyMsg = Message.stopCommunication;
                        }
                        break;
                }
            }
            catch (Exception e)
            {
                Console.Out.WriteLine("[Client] processMessage {0}", e.Message);
            }
            return replyMsg;
        }
        public void startCommunication()
        {
            Console.Out.WriteLine("[Client] **************");
            Thread.Sleep(waitingTime);
            // Data buffer 
            byte[] messageReceived = new byte[1024];
            int numBytes = 0;
            String rcvdMsg = null;
            Boolean stop = false;
            string reply = "";

            try
            {
                // Connect Socket to the remote endpoint 
                clientSocket.Connect(localEndPoint);
                // print connected EndPoint information  
                Console.WriteLine("[Client] connected to -> {0} ", clientSocket.RemoteEndPoint.ToString());

                while (!stop)
                {
                    // Receive the messagge using the method Receive().
                    numBytes = clientSocket.Receive(messageReceived);
                    rcvdMsg = Encoding.ASCII.GetString(messageReceived, 0, numBytes);
                    reply = this.processMessage(rcvdMsg);
                    this.sendReply(reply);
                    if (reply.Equals(Message.stopCommunication))
                    {
                        stop = true;
                    }
                }
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
        public void sendReply(string msg)
        {
            // Create the message to send
            Console.Out.WriteLine("[Client] Message to be sent: {0}", msg);
            byte[] messageSent = Encoding.ASCII.GetBytes(msg);
            int byteSent = clientSocket.Send(messageSent);
        }
        public void endCommunication()
        {
            Console.Out.WriteLine("[Client] End of communication to -> {0} ", clientSocket.RemoteEndPoint.ToString());
            clientSocket.Shutdown(SocketShutdown.Both);
            clientSocket.Close();
        }
    }

    public class ClientsSimulator
    {
        private int numberOfClients;
        private Client[] clients;
        public readonly int waitingTimeForStop = 2000;


        public ClientsSimulator(int n, int t)
        {
            numberOfClients = n;
            clients = new Client[numberOfClients];
            for (int i = 0; i < numberOfClients; i++)
            {
                clients[i] = new Client(false, i);
            }
        }

        public void SequentialSimulation()
        {
            while(true){
            Console.Out.WriteLine("\n[ClientSimulator] Sequential simulator is going to start ...");
            for (int i = 0; i < numberOfClients; i++)
            {
                clients[i].prepareClient();
                clients[i].startCommunication();
                clients[i].endCommunication();
            }

            Console.Out.WriteLine("\n[ClientSimulator] All clients finished with their communications ... ");

           Thread.Sleep(waitingTimeForStop);

            Client endClient = new Client(true, -1);
            endClient.prepareClient();
            endClient.startCommunication();
            endClient.endCommunication();
        }
        }

        public void ConcurrentSimulation()
        {
            Console.Out.WriteLine("[ClientSimulator] Concurrent simulator is going to start ...");
            var t = Task.Run(() => SequentialSimulation() );
            var t1 = Task.Run(() => SequentialSimulation() );
            var t2 = Task.Run(() => SequentialSimulation() );
            var t3 = Task.Run(() => SequentialSimulation() );
            var t4= Task.Run(() => SequentialSimulation() );
            var t5 = Task.Run(() => SequentialSimulation() );
            t.Wait();
            t1.Wait();
            t2.Wait();
            t3.Wait();
            t4.Wait();
            t5.Wait();



        }
    }
    class Program
    {
        // Main Method 
        static void Main(string[] args)
        {
            Console.Clear();
            int wt = 5000, nc = 20;
            ClientsSimulator clientsSimulator = new ClientsSimulator(nc, wt);
            //clientsSimulator.SequentialSimulation();
            Thread.Sleep(wt);
            // todo: Uncomment this, after finishing the method.
            clientsSimulator.ConcurrentSimulation();

        }
    }
}

标签: c#multithreadingclient-server

解决方案


你有一些比赛条件正在进行

看这个:

  public void ConcurrentSimulation()
        {
            Console.Out.WriteLine("[ClientSimulator] Concurrent simulator is going to start ...");
            var t = Task.Run(() => SequentialSimulation() );
            var t1 = Task.Run(() => SequentialSimulation() );
            var t2 = Task.Run(() => SequentialSimulation() );
            var t3 = Task.Run(() => SequentialSimulation() );
            var t4= Task.Run(() => SequentialSimulation() );
            var t5 = Task.Run(() => SequentialSimulation() );
            t.Wait();
            t1.Wait();
            t2.Wait();
            t3.Wait();
            t4.Wait();
            t5.Wait();



        }

你同时跑SequentialSimulation5次;此方法为您排队等待模拟的每个客户端提供了一个新clientSocket Socket对象,但是您同时运行了 5 次,因此其余代码针对同一个套接字运行 5 次(您正在创建 5每个客户端的套接字,并且只使用最后一个)

这意味着您在某个时候关闭了套接字,而其余线程继续尝试使用底层clientSocket,您会收到错误消息。

您需要每个SequentialSimulation都使用自己的一组Client对象运行,并且您需要确保在处理完套接字后不会尝试访问它。


推荐阅读