首页 > 解决方案 > .netcore 套接字中断,而经典 .net 仍然有效

问题描述

我有客户端和服务器用经典的.net 编写,通过 TCP 套接字进行通信。我也有许多并行连接的负载测试,一切正常。

但是,使用 .netcore 的相同代码会中断。在 linux 上,客户端在尝试从流中读取时遇到异常,它总是会中断:

客户端套接字错误:无法从传输连接读取数据:连接超时。

或者服务器也可能返回 0 作为要读取的字节。

在 Windows 上,.netcore 客户端中断的频率较低,但有时仍会出现以下错误:

socket error: Unable to read data from the transport connection: 连接尝试失败,因为连接方在一段时间后没有正确响应,或者建立连接失败,因为连接的主机没有响应

.netcore 3.0 顺便说一句。

任何想法为什么会发生这种情况?

客户:

    public class TcpConnection
    {
        object _lock = new object();
        bool _is_busy = false;
        public bool TakeLock()
        {
            lock (_lock)
            {
                if (_is_busy)
                {
                    return false;
                }
                else
                {
                    _is_busy = true;
                    return true;
                }
            }
        }
        public void ReleaseLock()
        {
            _is_busy = false;
        }
        public bool Connected { get; set; }
        public string ConnError { get; set; }
        public Socket client { get; set; }
        public Stream stream { get; set; }
        public BinaryWriter bw { get; set; }
        public DateTime LastUsed { get; set; }
        public int Index { get; set; }
        public TcpConnection(string hostname, int port)
        {
            client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

            SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs();
            connectEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ConnectedEvent);
            connectEventArg.UserToken = this;
            connectEventArg.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(hostname), port);
            var connected = client.ConnectAsync(connectEventArg);
            if (!connected)
            {
                if (connectEventArg.SocketError != SocketError.Success)
                {
                    #if (VERBOSE)
                    Console.WriteLine("Connection error (immediate)");
                    #endif
                    throw new LinqDbException("Linqdb: Connection error (immediate)");
                }
                #if (VERBOSE)
                Console.WriteLine("Connected immediately");
                #endif
                //client.NoDelay = true;
                client.ReceiveTimeout = 60000;
                client.SendTimeout = 60000;
                this.stream = new NetworkStream(client);
                this.bw = new BinaryWriter(stream);
            }
            else
            {
                int total_wait_ms = 0;
                while (!this.Connected)
                {
                    Thread.Sleep(100);

                    total_wait_ms += 100;
                    #if (VERBOSE)
                    if (total_wait_ms % 2000 == 0)
                    {
                        Console.WriteLine("Can't connect in {0} ms", total_wait_ms);
                    }
                    #endif
                }
                if (!string.IsNullOrEmpty(this.ConnError))
                {
                    throw new LinqDbException(this.ConnError + "  after " + total_wait_ms + " ms wait time");
                }
                else
                {
                    #if (VERBOSE)
                    Console.WriteLine("Connected {0} ms", total_wait_ms);
                    #endif
                }
            }
            _is_busy = true;
            LastUsed = DateTime.Now;
        }
        private void ConnectedEvent(object sender, SocketAsyncEventArgs e)
        {
            TcpConnection conn = e.UserToken as TcpConnection;
            if (e.SocketError != SocketError.Success)
            {
                #if (VERBOSE)
                Console.WriteLine("Connection error");
                #endif
                conn.ConnError = "Connection error";
                conn.Connected = true;
                return;
            }
            //e.ConnectSocket.NoDelay = true;
            e.ConnectSocket.ReceiveTimeout = 60000;
            e.ConnectSocket.SendTimeout = 60000;

            conn.stream = new NetworkStream(conn.client);
            conn.bw = new BinaryWriter(conn.stream);
            conn.ConnError = null;
            conn.Connected = true;
        }        
    }

    public class ClientSockets
    {

        const int _limit = 100;
        TcpConnection[] cons = new TcpConnection[_limit];
        object _lock = new object();
        object[] _locks = null;

        public byte[] CallServer(byte[] input, string hostname, int port, out string error_msg)
        {
            error_msg = null;
            if (_locks == null)
            {
                lock (_lock)
                {
                    if (_locks == null)
                    {
                        _locks = new object[_limit];
                        for (int i = 0; i < _limit; i++)
                        {
                            _locks[i] = new object();
                        }
                    }
                }
            }
            TcpConnection conn = null;
            while (true)
            {
                int last_index = 0;
                for (int i = _limit - 1; i >= 0; i--)
                {
                    if (cons[i] != null)
                    {
                        last_index = i;
                        break;
                    }
                }
                for (int i = 0; i < _limit; i++)
                {
                    var tmp = cons[i];
                    if (tmp != null)
                    {
                        var available = tmp.TakeLock();
                        if (!available)
                        {
                            continue;
                        }
                        else
                        {
                            if ((DateTime.Now - tmp.LastUsed).TotalSeconds > 30)
                            {
                                cons[i] = null;
                                try
                                {
                                    tmp.client.Dispose();
                                    tmp.stream.Dispose();
                                    tmp.bw.Dispose();
                                }
                                catch (Exception ex)
                                {
#if (VERBOSE)
                                    Console.WriteLine("Disposing error:" + ex.Message);
#endif
                                }
                                continue;
                            }
                            else
                            {
                                //ping
                                tmp.bw.Write(BitConverter.GetBytes(-3));
                                tmp.bw.Flush();

                                int numBytesRead = 0;
                                var data = new byte[1024];
                                var bad = false;
                                while (numBytesRead < 4)
                                {
                                    int read = 0;
                                    try
                                    {
                                        read = tmp.stream.Read(data, numBytesRead, data.Length - numBytesRead);
                                    }
                                    catch (Exception ex)
                                    {
                                        //server closed connection
                                        bad = true;
                                        break;
                                    }
                                    numBytesRead += read;
                                    if (read <= 0)
                                    {
                                        //server closed connection
                                        bad = true;
                                        break;
                                    }
                                }
                                if (bad)
                                {
                                    cons[i] = null;
                                    try
                                    {
                                        tmp.client.Dispose();
                                        tmp.stream.Dispose();
                                        tmp.bw.Dispose();
                                    }
                                    catch (Exception ex)
                                    {
#if (VERBOSE)
                                    Console.WriteLine("Disposing error:" + ex.Message);
#endif
                                    }
                                    continue;
                                }
                                var pong = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
                                if (pong != -3)
                                {
                                    cons[i] = null;
                                    try
                                    {
                                        tmp.client.Dispose();
                                        tmp.stream.Dispose();
                                        tmp.bw.Dispose();
                                    }
                                    catch (Exception ex)
                                    {
#if (VERBOSE)
                                    Console.WriteLine("Disposing error:" + ex.Message);
#endif
                                    }
                                    continue;
                                }

                                //socket is ok
                                conn = tmp;
                                break;
                            }

                        }
                    }
                    else
                    {
                        if (i < last_index)
                        {
                            continue;
                        }
                        if (Monitor.TryEnter(_locks[i]))
                        {
                            try
                            {
                                if (cons[i] != null)
                                {
                                    continue;
                                }
                                conn = new TcpConnection(hostname, port);
                                cons[i] = conn;
                                conn.Index = i;
                                break;
                            }
                            catch (Exception ex)
                            {
                                conn = null;
                                cons[i] = null;
                                #if (VERBOSE)
                                Console.WriteLine("Client socket creation error: " + ex.Message);
                                #endif
                                error_msg = ex.Message;
                                return BitConverter.GetBytes(-1);
                            }
                            finally
                            {
                                Monitor.Exit(_locks[i]);
                            }
                        }
                        else
                        {
                            continue;
                        }
                    }
                }
                if (conn == null)
                {
                    Thread.Sleep(150);
                    continue;
                }
                else
                {
                    break;
                }
            }

            bool error = false;
            try
            {
                var length = BitConverter.GetBytes(input.Length);
                var data = new byte[1024];
                conn.bw.Write(input);
                conn.bw.Flush();

                using (MemoryStream ms = new MemoryStream())
                {
                    int numBytesRead;
                    int total;
                    while (true)
                    {
                        numBytesRead = 0;
                        while (numBytesRead < 4)
                        {
                            int read = conn.stream.Read(data, numBytesRead, data.Length - numBytesRead);
                            numBytesRead += read;
                            if (read <= 0)
                            {
                                throw new LinqDbException("Read <= 0: " + read);
                            }
                        }
                        numBytesRead -= 4;
                        total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
                        if (total == -2)
                        {
                            #if (VERBOSE)
                            Console.WriteLine("PINGER!!!");
                            #endif
                            continue;
                        }
                        break;
                    }
                    if (numBytesRead > 0)
                    {
                        var finput = new byte[numBytesRead];
                        for (int i = 0; i < numBytesRead; i++)
                        {
                            finput[i] = data[4 + i];
                        }
                        ms.Write(finput, 0, numBytesRead);
                    }
                    total -= numBytesRead;
                    while (total > 0)
                    {
                        numBytesRead = conn.stream.Read(data, 0, data.Length);
                        if (numBytesRead <= 0)
                        {
                            throw new LinqDbException("numBytesRead <= 0: " + numBytesRead);
                        }
                        ms.Write(data, 0, numBytesRead);
                        total -= numBytesRead;
                    }
                    conn.LastUsed = DateTime.Now;
                    return ms.ToArray();
                }
            }
            catch (Exception ex)
            {
                #if (VERBOSE)
                Console.WriteLine("Client socket error: " + ex.Message);
                #endif
                error = true;
                error_msg = ex.Message;
                return BitConverter.GetBytes(-1);
            }
            finally
            {
                if (!error)
                {
                    conn.ReleaseLock();
                }
                else
                {
                    cons[conn.Index] = null;
                    try
                    {
                        conn.client.Dispose();
                        conn.stream.Dispose();
                        conn.bw.Dispose();
                    }
                    catch (Exception ex)
                    {
                        #if (VERBOSE)
                        Console.WriteLine("Disposing error:" + ex.Message);
                        #endif
                    }
                }
            }
        }
    }

服务器:

    class Pinger
    {
        public bool Done { get; set; }
        public object _lock = new object();
        public BinaryWriter bw { get; set; }
        public void Do()
        {
            try
            {
                int total_wait = 0;
                int sleep_ms = 2000;
                while (!Done)
                {
                    Thread.Sleep(sleep_ms);
                    total_wait += sleep_ms;
                    if (total_wait % 10000 == 0)
                    {
                        lock (_lock)
                        {
                            if (!Done)
                            {
                                bw.Write(BitConverter.GetBytes(-2));
                                bw.Flush();
                            }
                        }
                    }
                }
            }
            catch { return; }
        }
    }
    class ServerSockets
    {
        static Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        static string db_path = null;
        static int port = 0;
        public static void Main()
        {
            AppDomain.CurrentDomain.ProcessExit += new EventHandler(OnProcessExit);
            CommandHelper.ReadConfig(out db_path, out port);
            var sw = new Stopwatch();
            sw.Start();
            Console.WriteLine("Building in-memory indexes...");
            ServerLogic.Logic.ServerBuildIndexesOnStart(db_path);
            sw.Stop();
            Console.WriteLine("Done building in-memory indexes. It took: " + Math.Round(sw.ElapsedMilliseconds / 60000.0, 0) + " min.");

            Console.WriteLine("Listening on " + port);
            listener.Bind(new IPEndPoint(IPAddress.Any, port));
            listener.Listen((int)SocketOptionName.MaxConnections);


            SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
            acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
            bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
            if (!willRaiseEvent)
            {
                Service(null, acceptEventArg);
            }

            while (true)
            {
                try
                {
                    Thread.Sleep(60000);
                    #if (VERBOSE)
                    Console.WriteLine("Still kicking...");
                    #endif
                }
                catch (Exception ex)
                {
                    Console.WriteLine("BAD ERROR... " + ex.Message);
                }
            }
        }
        static void OnProcessExit(object sender, EventArgs e)
        {
            ServerLogic.Logic.Dispose();
        }
        private static void LoopToStartAccept()
        {
            SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
            acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
            bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
            if (!willRaiseEvent)
            {
                Service(null, acceptEventArg);
            }
        }
        private static void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
        {
            #if (VERBOSE)
            Console.WriteLine("bad accept");
            #endif
            acceptEventArgs.AcceptSocket.Dispose();
        }
        private static void Service(object sender, SocketAsyncEventArgs e)
        {
            if (e.SocketError != SocketError.Success)
            {
                LoopToStartAccept();
                HandleBadAccept(e);
                return;
            }

            LoopToStartAccept();


            try
            {
                using (Socket soc = e.AcceptSocket)
                {
                    var rg = new Random();
                    #if (VERBOSE)
                    Console.WriteLine("New socket: " + rg.Next(0, 1000000));
                    #endif
                    //soc.NoDelay = true;
                    soc.ReceiveTimeout = 60000;
                    soc.SendTimeout = 60000;
                    using (Stream stream = new NetworkStream(soc))
                    using (BinaryWriter bw = new BinaryWriter(stream))
                    {
                        while (true) //reuse same connection for many commands
                        {
                            byte[] data = new byte[1024];
                            using (MemoryStream ms = new MemoryStream())
                            {
                                int numBytesRead = 0;
                                while (numBytesRead < 4)
                                {
                                    int read = 0;
                                    try
                                    {
                                        read = stream.Read(data, numBytesRead, data.Length - numBytesRead);
                                    }
                                    catch (Exception ex)
                                    {
                                        //client closed connection
                                        return;
                                    }
                                    numBytesRead += read;
                                    if (read <= 0)
                                    {
                                        //throw new Exception("Read <= 0: " + read);
                                        //client closed connection
                                        return;
                                    }
                                }
                                numBytesRead -= 4;
                                var total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
                                if (total == -3) //ping
                                {
                                    //pong
                                    bw.Write(BitConverter.GetBytes(-3));
                                    bw.Flush();
                                    continue;
                                }
                                if (numBytesRead > 0)
                                {
                                    var finput = new byte[numBytesRead];
                                    for (int i = 0; i < numBytesRead; i++)
                                    {
                                        finput[i] = data[4 + i];
                                    }
                                    ms.Write(finput, 0, numBytesRead);
                                }
                                total -= numBytesRead;
                                while (total > 0)
                                {
                                    numBytesRead = stream.Read(data, 0, data.Length);
                                    if (numBytesRead <= 0)
                                    {
                                        throw new Exception("numBytesRead <= 0: " + numBytesRead);
                                    }
                                    ms.Write(data, 0, numBytesRead);
                                    total -= numBytesRead;
                                }
                                var input = ms.ToArray();
                                var pinger = new Pinger()
                                {
                                    bw = bw
                                };
                                ThreadPool.QueueUserWorkItem(f => { pinger.Do(); });
                                var output = ServerLogic.Logic.Execute(input, db_path);
                                pinger.Done = true;
                                lock (pinger._lock)
                                {
                                    bw.Write(output);
                                    bw.Flush();
                                }
                            }
                        }
                    }
                }
            }
            catch (Exception ex)
            {
#if (VERBOSE)
                Console.WriteLine("Socket error: " + ex.Message);
#endif
                //try
                //{
                //    var rg = new Random();
                //    File.WriteAllText("sock_error_" + rg.Next() + ".txt", ex.Message + " " + ex.StackTrace + (ex.InnerException != null ? (" " + ex.InnerException.Message + " " + ex.InnerException.StackTrace) : ""));
                //}
                //catch (Exception) { }
                return;
            }
            finally
            {
                #if (VERBOSE)
                Console.WriteLine("Listener finally ");
                #endif
            }
        }
    }

编辑 完全可重现的项目:https ://github.com/ren85/serverclientbug

编辑 仍然没有解决方案,谁能解决这个问题,再多 500 分。

编辑 也许相关 https://github.com/dotnet/coreclr/issues/11979 https://github.com/dotnet/runtime/issues/364

标签: c#sockets.net-core

解决方案


我调试了您的代码,看起来问题是错误而不是 .NET 核心中的套接字问题。从代码看来,您期望发送的数据的前四个字节将包含数据的长度,但您只发送数据。这会导致您获得随机数据长度,因为数据的前四个字节用作长度。在某些情况下,这比实际数据大,然后读取数据的 while 循环在等待更多永远不会到达的数据时超时。

这是客户端代码中存在问题的部分:

var length = BitConverter.GetBytes(input.Length); // You prepare the length
var data = new byte[1024];
conn.bw.Write(length); // This is missing in your code so it never gets sent to the server
conn.bw.Write(input);
conn.bw.Flush();

包含修复的完整客户端:

public class ClientSockets
{

   const int _limit = 100;
   TcpConnection[] cons = new TcpConnection[_limit];
   object _lock = new object();
   object[] _locks = null;

   public byte[] CallServer(byte[] input, string hostname, int port, out string error_msg)
   {
      error_msg = null;
      if (_locks == null)
      {
         lock (_lock)
         {
            if (_locks == null)
            {
               _locks = new object[_limit];
               for (int i = 0; i < _limit; i++)
               {
                  _locks[i] = new object();
               }
            }
         }
      }
      TcpConnection conn = null;
      while (true)
      {
         int last_index = 0;
         for (int i = _limit - 1; i >= 0; i--)
         {
            if (cons[i] != null)
            {
               last_index = i;
               break;
            }
         }
         for (int i = 0; i < _limit; i++)
         {
            var tmp = cons[i];
            if (tmp != null)
            {
               var available = tmp.TakeLock();
               if (!available)
               {
                  continue;
               }
               else
               {
                  if ((DateTime.Now - tmp.LastUsed).TotalSeconds > 30)
                  {
                     cons[i] = null;
                     try
                     {
                        tmp.client.Dispose();
                        tmp.stream.Dispose();
                        tmp.bw.Dispose();
                     }
                     catch (Exception ex)
                     {
#if (VERBOSE)
                                    Console.WriteLine("Disposing error:" + ex.Message);
#endif
                     }
                     continue;
                  }
                  else
                  {
                     //ping
                     tmp.bw.Write(BitConverter.GetBytes(-3));
                     tmp.bw.Flush();

                     int numBytesRead = 0;
                     var data = new byte[1024];
                     var bad = false;
                     while (numBytesRead < 4)
                     {
                        int read = 0;
                        try
                        {
                           read = tmp.stream.Read(data, numBytesRead, data.Length - numBytesRead);
                        }
                        catch (Exception ex)
                        {
                           //server closed connection
                           bad = true;
                           break;
                        }
                        numBytesRead += read;
                        if (read <= 0)
                        {
                           //server closed connection
                           bad = true;
                           break;
                        }
                     }
                     if (bad)
                     {
                        cons[i] = null;
                        try
                        {
                           tmp.client.Dispose();
                           tmp.stream.Dispose();
                           tmp.bw.Dispose();
                        }
                        catch (Exception ex)
                        {
#if (VERBOSE)
                                    Console.WriteLine("Disposing error:" + ex.Message);
#endif
                        }
                        continue;
                     }
                     var pong = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
                     if (pong != -3)
                     {
                        cons[i] = null;
                        try
                        {
                           tmp.client.Dispose();
                           tmp.stream.Dispose();
                           tmp.bw.Dispose();
                        }
                        catch (Exception ex)
                        {
#if (VERBOSE)
                                    Console.WriteLine("Disposing error:" + ex.Message);
#endif
                        }
                        continue;
                     }

                     //socket is ok
                     conn = tmp;
                     break;
                  }

               }
            }
            else
            {
               if (i < last_index)
               {
                  continue;
               }
               if (Monitor.TryEnter(_locks[i]))
               {
                  try
                  {
                     if (cons[i] != null)
                     {
                        continue;
                     }
                     conn = new TcpConnection(hostname, port);
                     cons[i] = conn;
                     conn.Index = i;
                     break;
                  }
                  catch (Exception ex)
                  {
                     conn = null;
                     cons[i] = null;
#if (VERBOSE)
                                Console.WriteLine("Client socket creation error: " + ex.Message);
#endif
                     error_msg = ex.Message;
                     return BitConverter.GetBytes(-1);
                  }
                  finally
                  {
                     Monitor.Exit(_locks[i]);
                  }
               }
               else
               {
                  continue;
               }
            }
         }
         if (conn == null)
         {
            Thread.Sleep(150);
            continue;
         }
         else
         {
            break;
         }
      }

      bool error = false;
      try
      {
         var length = BitConverter.GetBytes(input.Length);
         var data = new byte[1024];
         conn.bw.Write(length); // Send the length first.
         conn.bw.Write(input);
         conn.bw.Flush();

         using (MemoryStream ms = new MemoryStream())
         {
            int numBytesRead;
            int total;
            while (true)
            {
               numBytesRead = 0;
               while (numBytesRead < 4)
               {
                  int read = conn.stream.Read(data, numBytesRead, data.Length - numBytesRead);
                  numBytesRead += read;
                  if (read <= 0)
                  {
                     throw new LinqDbException("Read <= 0: " + read);
                  }
               }
               numBytesRead -= 4;
               total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
               if (total == -2)
               {
#if (VERBOSE)
                            Console.WriteLine("PINGER!!!");
#endif
                  continue;
               }
               break;
            }
            if (numBytesRead > 0)
            {
               var finput = new byte[numBytesRead];
               for (int i = 0; i < numBytesRead; i++)
               {
                  finput[i] = data[4 + i];
               }
               ms.Write(finput, 0, numBytesRead);
            }
            total -= numBytesRead;
            while (total > 0)
            {
               numBytesRead = conn.stream.Read(data, 0, data.Length);
               if (numBytesRead <= 0)
               {
                  throw new LinqDbException("numBytesRead <= 0: " + numBytesRead);
               }
               ms.Write(data, 0, numBytesRead);
               total -= numBytesRead;
            }
            conn.LastUsed = DateTime.Now;
            return ms.ToArray();
         }
      }
      catch (Exception ex)
      {
#if (VERBOSE)
                Console.WriteLine("Client socket error: " + ex.Message);
#endif
         error = true;
         error_msg = ex.Message;
         return BitConverter.GetBytes(-1);
      }
      finally
      {
         if (!error)
         {
            conn.ReleaseLock();
         }
         else
         {
            cons[conn.Index] = null;
            try
            {
               conn.client.Dispose();
               conn.stream.Dispose();
               conn.bw.Dispose();
            }
            catch (Exception ex)
            {
#if (VERBOSE)
                        Console.WriteLine("Disposing error:" + ex.Message);
#endif
            }
         }
      }
   }
}

服务器代码中也存在同样的问题:

var output = ServerLogic.Logic.Execute(input, db_path);
var length = BitConverter.GetBytes(output.Length); // You again need to get the length
pinger.Done = true;
lock (pinger._lock)
{
   bw.Write(length); // Send it before the data
   bw.Write(output);
   bw.Flush();
}

包含修复的完整服务器:

class ServerSockets
{
   static Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

   static string db_path = null;
   static int port = 0;
   public static void Main()
   {
      AppDomain.CurrentDomain.ProcessExit += new EventHandler(OnProcessExit);
      CommandHelper.ReadConfig(out db_path, out port);
      var sw = new Stopwatch();
      sw.Start();
      Console.WriteLine("Building in-memory indexes...");
      ServerLogic.Logic.ServerBuildIndexesOnStart(db_path);
      sw.Stop();
      Console.WriteLine("Done building in-memory indexes. It took: " + Math.Round(sw.ElapsedMilliseconds / 60000.0, 0) + " min.");

      Console.WriteLine("Listening on " + port);
      listener.Bind(new IPEndPoint(IPAddress.Any, port));
      listener.Listen((int)SocketOptionName.MaxConnections);


      SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
      acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
      bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
      if (!willRaiseEvent)
      {
         Service(null, acceptEventArg);
      }

      while (true)
      {
         try
         {
            Thread.Sleep(60000);
#if (VERBOSE)
                    Console.WriteLine("Still kicking...");
#endif
         }
         catch (Exception ex)
         {
            Console.WriteLine("BAD ERROR... " + ex.Message);
         }
      }
   }
   static void OnProcessExit(object sender, EventArgs e)
   {
      ServerLogic.Logic.Dispose();
   }
   private static void LoopToStartAccept()
   {
      SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();
      acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(Service);
      bool willRaiseEvent = listener.AcceptAsync(acceptEventArg);
      if (!willRaiseEvent)
      {
         Service(null, acceptEventArg);
      }
   }
   private static void HandleBadAccept(SocketAsyncEventArgs acceptEventArgs)
   {
#if (VERBOSE)
            Console.WriteLine("bad accept");
#endif
      acceptEventArgs.AcceptSocket.Dispose();
   }
   private static void Service(object sender, SocketAsyncEventArgs e)
   {
      if (e.SocketError != SocketError.Success)
      {
         LoopToStartAccept();
         HandleBadAccept(e);
         return;
      }

      LoopToStartAccept();


      try
      {
         using (Socket soc = e.AcceptSocket)
         {
            var rg = new Random();
#if (VERBOSE)
                    Console.WriteLine("New socket: " + rg.Next(0, 1000000));
#endif
            //soc.NoDelay = true;
            soc.ReceiveTimeout = 60000;
            soc.SendTimeout = 60000;
            using (Stream stream = new NetworkStream(soc))
            using (BinaryWriter bw = new BinaryWriter(stream))
            {
               while (true) //reuse same connection for many commands
               {
                  byte[] data = new byte[1024];
                  using (MemoryStream ms = new MemoryStream())
                  {
                     int numBytesRead = 0;
                     while (numBytesRead < 4)
                     {
                        int read = 0;
                        try
                        {
                           read = stream.Read(data, numBytesRead, data.Length - numBytesRead);
                        }
                        catch (Exception ex)
                        {
                           //client closed connection
                           return;
                        }
                        numBytesRead += read;
                        if (read <= 0)
                        {
                           //throw new Exception("Read <= 0: " + read);
                           //client closed connection
                           return;
                        }
                     }
                     numBytesRead -= 4;
                     var total = BitConverter.ToInt32(new byte[4] { data[0], data[1], data[2], data[3] }, 0);
                     if (total == -3) //ping
                     {
                        //pong
                        bw.Write(BitConverter.GetBytes(-3));
                        bw.Flush();
                        continue;
                     }
                     if (numBytesRead > 0)
                     {
                        var finput = new byte[numBytesRead];
                        for (int i = 0; i < numBytesRead; i++)
                        {
                           finput[i] = data[4 + i];
                        }
                        ms.Write(finput, 0, numBytesRead);
                     }
                     total -= numBytesRead;
                     while (total > 0)
                     {
                        numBytesRead = stream.Read(data, 0, data.Length);
                        if (numBytesRead <= 0)
                        {
                           throw new Exception("numBytesRead <= 0: " + numBytesRead);
                        }
                        ms.Write(data, 0, numBytesRead);
                        total -= numBytesRead;
                     }
                     var input = ms.ToArray();
                     var pinger = new Pinger()
                     {
                        bw = bw
                     };
                     ThreadPool.QueueUserWorkItem(f => { pinger.Do(); });
                     var output = ServerLogic.Logic.Execute(input, db_path);
                     var length = BitConverter.GetBytes(output.Length);
                     pinger.Done = true;
                     lock (pinger._lock)
                     {
                        bw.Write(length);
                        bw.Write(output);
                        bw.Flush();
                     }
                  }
               }
            }
         }
      }
      catch (Exception ex)
      {
#if (VERBOSE)
                Console.WriteLine("Socket error: " + ex.Message);
#endif
         //try
         //{
         //    var rg = new Random();
         //    File.WriteAllText("sock_error_" + rg.Next() + ".txt", ex.Message + " " + ex.StackTrace + (ex.InnerException != null ? (" " + ex.InnerException.Message + " " + ex.InnerException.StackTrace) : ""));
         //}
         //catch (Exception) { }
         return;
      }
      finally
      {
#if (VERBOSE)
                Console.WriteLine("Listener finally ");
#endif
      }
   }
}

对我来说,通过这两个更改,代码可以在 Windows 和 Linux 中运行。如果您仍然有问题,您还需要提供更多详细信息,您将发送哪些数据。

在您发布 GitHub 示例后,我对 Linux 进行了更多研究。正如我在下面的评论中已经建议的那样,您应该使用该Connect方法而不是ConnectAsync获得更好的行为。在我对 Raspbian 和 Arch Linux 的测试中,一旦您在客户端打开大量线程,您可能会占用所有 ThreadPool,因此 Connect 事件停止发生并且一切都超时。由于您的代码在线程上循环并等待,因此无论如何使用 Async 方法并不会获得太多收益。

TcpConnection这是进行同步连接的更改类:

public class TcpConnection
{
   object _lock = new object();
   bool _is_busy = false;
   public bool TakeLock()
   {
      lock (_lock)
      {
            if (_is_busy)
            {
               return false;
            }
            else
            {
               _is_busy = true;
               return true;
            }
      }
   }
   public void ReleaseLock()
   {
      _is_busy = false;
   }
   public bool Connected { get; set; }
   public string ConnError { get; set; }
   public Socket client { get; set; }
   public Stream stream { get; set; }
   public BinaryWriter bw { get; set; }
   public DateTime LastUsed { get; set; }
   public int Index { get; set; }
   public TcpConnection(string hostname, int port)
   {
      client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

      client.Connect(new IPEndPoint(IPAddress.Parse(hostname), port));
      if (client.Connected)
      {
#if (VERBOSE)
            Console.WriteLine("Connected immediately");
#endif
            //client.NoDelay = true;
            client.ReceiveTimeout = 60000;
            client.SendTimeout = 60000;
            this.stream = new NetworkStream(client);
            this.bw = new BinaryWriter(stream);
      }
      _is_busy = true;
      LastUsed = DateTime.Now;
   }
}

作为最后的评论,我不得不说您的代码非常复杂,并且没有像其他人指出的那样遵循最佳实践。我建议您了解有关多线程和异步编程的更多信息,然后改进代码,这也将使其工作得更好、更可预测。

以下是有关如何异步使用套接字的 Microsoft 示例的链接以及有关如何进行异步编程的一般文档:

异步客户端套接字示例

异步服务器套接字示例

.NET 中的并行处理、并发和异步编程


推荐阅读