首页 > 解决方案 > Rust TCP 套接字服务器仅使用一个连接

问题描述

我是 Rust 的新手,我正在尝试配置一个简单的 tcp 套接字服务器,它将监听连接并回复收到的相同消息。

问题是,除了与多个客户端连接时,这可以按我的意愿工作。连接的第一个客户端将发送和接收消息,但如果第二个客户端连接,第一个客户端继续工作,但第二个客户端从不接收消息,事实上消息永远不会进入将处理它的代码。如果我断开第一个套接字,服务器将永远开始发送垃圾邮件,从第一个套接字接收到的消息与它发送的最后一条消息的内容相同。

我很确定我的代码做错了,但我找不到

这是我的服务器结构:

use std::collections::HashMap;
use std::io::Read;
use std::io::Write;
use std::net::Shutdown;
use std::net::TcpListener;
use std::net::TcpStream;
use std::str;
use std::sync::{Arc, RwLock};
use threadpool::ThreadPool;

#[derive(Clone, Debug)]
pub struct Server {
    id: Arc<RwLock<u32>>,
    connections: Arc<RwLock<HashMap<u32, TcpStream>>>,
    url: String,
    thread_pool: ThreadPool
}

impl Server {
    pub fn new(url: String) -> Server {
        let server = Server {
            id: Arc::new(RwLock::new(0)),
            connections: Arc::new(RwLock::new(HashMap::new())),
            url,
            thread_pool: ThreadPool::new(10)
        };

        server
    }

    pub fn start(&self) {
        let listener = TcpListener::bind(&self.url).expect("Could not start the server");

        println!("Server started succesfully");

        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let mut self_clone = self.clone();

                    self.thread_pool.execute(move || {
                        self_clone.on_client_connect(stream.try_clone().unwrap());
                    });
                }
                Err(error) => eprintln!("Error when tried to use stream. Error = {:?}", error),
            }
        }
    }

    fn on_client_connect(&mut self, stream: TcpStream) {
        println!("Client connected from {}", stream.local_addr().unwrap());

        let mut id = self.id.write().unwrap();
        {
            *id += 1;
        }

        self.connections
            .write()
            .unwrap()
            .insert(*id, stream.try_clone().unwrap());

        let mut stream = stream.try_clone().unwrap();

        let mut buffer = [0; 1024];

        while match stream.read(&mut buffer) {
            Ok(size) => {
                println!(
                    "Message received from {} - {}",
                    id,
                    str::from_utf8(&buffer).unwrap()
                );

                stream.write_all(&buffer[0..size]).unwrap();

                true
            }
            Err(error) => {
                println!(
                    "Error when reading message from socket. Error = {:?}",
                    error
                );
                stream.shutdown(Shutdown::Both).unwrap();

                false
            }
        } { }
    }
}

在我的 main.rs 中,我只是调用了 connect 函数,服务器开始工作

标签: rust

解决方案


在您的函数中的这段代码中on_client_connect,您正在获取一个读锁 self.id

let mut id = self.id.write().unwrap();
{
    *id += 1;
}

然而,id持有锁的变量直到函数结束时才被释放。这意味着所有其他客户端将等待这个锁被释放,直到当前持有锁的函数完成(当该客户端断开连接时发生)才会发生。

你可以通过重写上面的代码来解决这个问题,只在递增时保持锁,然后将 ID 值存储在一个变量中:

let id: u32 = {
    let mut id_lock = self.id.write.unwrap();
    *id_lock += 1;
    *id_lock

    // id_lock is dropped at the end of this block, so the lock is released
};

更好的是,您可以使用AtomicU32,它仍然是线程安全的,但根本不需要锁定:

use std::sync::atomic::{AtomicU32, Ordering};

struct {
    id: Arc<AtomicU32>,
    // ...
}
// Fetch previous value, then increment `self.id` by one, in a thread-safe and lock-free manner
let id: u32 = self.id.fetch_add(1, Ordering::Relaxed);

此外,当连接关闭时,您的代码将进入无限循环,因为您没有处理返回的情况,这表明连接已关闭:stream.read()Ok(0)

while match stream.read(&mut buffer) {
    Ok(0) => false, // handle connection closed...
    Ok(size) => { /* ... */ }
    Err(err) => { /* ... */ }
} {}

推荐阅读