首页 > 解决方案 > 如何测试多线程服务器

问题描述

我有一个主线程服务器,它基本上听任何想连接到端口的人

/**
 * The main server thread receives request from and sends
 * response to clients.
 */
public class Server {

  /*
  Port number for the client connection
   */
  private static final int PORT = 3000;

  /*
  The number of client can be connected
   */
  private static final int SIZE = 10;

  /*
  The executor
   */
  private static ExecutorService executorService = Executors.newFixedThreadPool(SIZE);



  /**
   * Starts the main server.
   */
  public static void main(String[] args) {
    /*
    All the information are stored into the queue
    */
    BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
    /*
    All the clients are stored into the map
    */
    ConcurrentMap<byte[], Boolean> clientManagement = new ConcurrentHashMap<>();
    runMainServer(messageQueue, clientManagement);

  }

  private static void runMainServer(BlockingQueue<Message> messageQueue, ConcurrentMap<byte[], Boolean> clientManagement) {
    try (
        ServerSocket serverSocket = new ServerSocket(PORT);
    ) {
      System.out.println("Starting server");
      while (true) {
        System.out.println("Waiting for request");
        Socket socket = serverSocket.accept();
        System.out.println("Processing request");
        ClientThread newClient = new ClientThread(socket, messageQueue, clientManagement);
        executorService.submit(newClient);
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

}

而且我有许多多线程子服务器来处理每个相同的客户端。客户端首先将被服务器接受,并检查服务器收到的第一条消息是否是 connect_message。如果是,则它们已正式连接。除了connect_message,还有更多的消息。但我只是不会对它们过于具体。

/**
 * The client thread.
 */
public class ClientThread implements Runnable {

  private Socket socket;
  private BlockingQueue<Message> messageQueue;
  private ConcurrentMap<byte[], Boolean> clientManagement;
  private byte[] username;

  /**
   *
   *
   * @param socket
   * @param messageQueue
   * @param clientManagement
   */
  public ClientThread(Socket socket, BlockingQueue<Message> messageQueue, ConcurrentMap<byte[], Boolean> clientManagement) {
    this.socket = socket;
    this.messageQueue = messageQueue;
    this.clientManagement = clientManagement;
    this.username = new byte[1];
  }

  /**
   *
   */
  @Override
  public void run() {
    try (
        ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
        ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
        ) {
      Message m = (Message) in.readObject();
      if (m.getIdentifier() == MessageIdentifier.CONNECT_MESSAGE) {
        ConnectMessage cm = (ConnectMessage) m;
        this.username = cm.getUsername();
        clientManagement.put(cm.getUsername(), true);
        byte[] cntMsg = "Successfully Connected!".getBytes();
        ConnectResponse cr = new ConnectResponse(true, cntMsg.length, cntMsg);
        out.writeObject(cr);
      } else {
        // Handle failing request
        handleFailedMsg(out, "Client should connect first");
        socket.close();
        throw new IllegalArgumentException("Connect unsuccessfully");
      }
      handleClient(in, out);
      socket.close();
    } catch (IOException | ClassNotFoundException | InterruptedException e) {
      e.printStackTrace();
    }
  }

  /**
   *
   * @param in
   * @param out
   * @throws InterruptedException
   * @throws IOException
   * @throws ClassNotFoundException
   */
  private void handleClient(ObjectInputStream in, ObjectOutputStream out)
      throws InterruptedException, IOException, ClassNotFoundException {
    while (true) {
      // Handle message taken from the queue
      Message msgFromQueue = messageQueue.take();
      handleQueueRequest(msgFromQueue, out);

      // Handle request obtained by user
      Message request = (Message) in.readObject();
      // Handle disconnect
      if (request.getIdentifier() == MessageIdentifier.DISCONNECT_MESSAGE) {
        DisconnectMessage dm = (DisconnectMessage) request;
        // If the message is not for this thread, then put it back and ignore it.
        if (!Arrays.equals(username, dm.getUsername())) {
          messageQueue.add(request);
          continue;
        }
        // Check if the username is inside the client map
        if (!clientManagement.containsKey(dm.getUsername())) {
          handleFailedMsg(out, "The client doesn't exist");
        }
        // Disconnect
        clientManagement.remove(dm.getUsername());
        // Create disconnect response
        byte[] message = "See you again".getBytes();
        DisconnectResponse dr = new DisconnectResponse(true, message.length, message);
        // Write to the client
        out.writeObject(dr);
        break;
      }
      // Handle other
      if (!handleRequest(request, out)) {
        handleFailedMsg(out, "The request failed due to incorrect username.");
      }
    }
  }

  /**
   *
   * @param request
   * @param out
   * @return
   * @throws IOException
   */
  private boolean handleRequest(Message request, ObjectOutputStream out) throws IOException {
    switch (request.getIdentifier()) {
      // If broadcast, then every one should know
      case BROADCAST_MESSAGE:
        BroadcastMessage bm = (BroadcastMessage) request;
        if (!Arrays.equals(username, bm.getUsername())) {
          return false;
        }
        messageQueue.add(request);
        break;
      // If user want the list of connected users
      case QUERY_CONNECTED_USERS:
        QueryUsersMessage qu = (QueryUsersMessage) request;
        if (!Arrays.equals(username, qu.getUsername())) {
          return false;
        }
        List<Pair<Integer, byte[]>> userList = new ArrayList<>();
        for (byte[] username : clientManagement.keySet()) {
          Pair<Integer, byte[]> user = new Pair<>(username.length, username);
          userList.add(user);
        }
        // Create a new query response containing all the users
        QueryResponse qr = new QueryResponse(clientManagement.keySet().size(), userList);
        out.writeObject(qr);
        break;
      // If user wants to send a direct message to the other user
      case DIRECT_MESSAGE:
        DirectMessage dm = (DirectMessage) request;
        if (!Arrays.equals(username, dm.getUsername())) {
          return false;
        }
        messageQueue.add(request);
        break;
      // If user wants to send an insult to the other user and broadcast to the chat room
      case SEND_INSULT:
        SendInsultMessage si = (SendInsultMessage) request;
        if (!Arrays.equals(username, si.getUsername())) {
          return false;
        }
        messageQueue.add(request);
        break;
    }
    return true;
  }

  /**
   *
   * @param out
   * @param description
   * @throws IOException
   */
  public void handleFailedMsg(ObjectOutputStream out, String description) throws IOException {
    byte[] failedMsg = description.getBytes();
    FailedMessage fm = new FailedMessage(failedMsg.length, failedMsg);
    out.writeObject(fm);
  }

  /**
   *
   * @param request
   * @param out
   * @throws IOException
   */
  public void handleQueueRequest(Message request, ObjectOutputStream out) throws IOException {
    switch (request.getIdentifier()) {
      case SEND_INSULT:
        // Gets the message from queue
        SendInsultMessage si = (SendInsultMessage) request;
        // Check if the user already gotten the message
        if (!si.getOtherUsers().contains(username)) {
          out.writeObject(si);
          si.addUsers(username);
        }
        // Check if all the users already gotten the message
        if (si.getOtherUsers().size() < clientManagement.keySet().size()) {
          messageQueue.add(si);
        }
        break;
      case DIRECT_MESSAGE:
        DirectMessage dm = (DirectMessage) request;
        // Check if the message is for this user
        if (Arrays.equals(username, dm.getRecipientUsername())) {
          out.writeObject(dm);
        } else { // If not for this user then put it back
          messageQueue.add(dm);
        }
        break;
      case BROADCAST_MESSAGE:
        // Gets the message from queue
        BroadcastMessage bm = (BroadcastMessage) request;
        // Check if the user already gotten the message
        if (!bm.getOtherUsers().contains(username)) {
          out.writeObject(bm);
          bm.addUsers(username);
        }
        // Check if all the users already gotten the message
        if (bm.getOtherUsers().size() < clientManagement.keySet().size()) {
          messageQueue.add(bm);
        }
        break;
    }
  }

我想为我的服务器做 JUnit 测试。测试这样的多线程服务器的最佳方法是什么?

这是我正在尝试的 JUnit 测试代码。我首先启动一个被服务器接受的线程。然后我将启动一个客户端并假装客户端正在向服务器发送一些东西。我首先想尝试一个 connect_message 来看看连接是如何工作的。但到目前为止,该测试似乎没有对 JUnit 测试做出响应。它只是继续运行,没有任何事情发生

public class ClientThreadTest {

  private Thread foo;
  private List<ClientThread> clientList;
  private BlockingQueue<Message> messageQueue;
  private ConcurrentMap<byte[], Boolean> clientManagement;
  private static final int PORT = 3000;

  @Before
  public void setUp() throws Exception {
    messageQueue = new LinkedBlockingQueue<>();
    clientManagement = new ConcurrentHashMap<>();
  }

  @Test
  public void run() throws IOException, ClassNotFoundException {
    ServerSocket socket = new ServerSocket(PORT);
    foo = new Thread(new ClientThread(socket.accept(), messageQueue, clientManagement));
    foo.start();
    Socket fooClient = new Socket("localhost", PORT);
    ObjectOutputStream out = new ObjectOutputStream(fooClient.getOutputStream());
    ObjectInputStream in = new ObjectInputStream(fooClient.getInputStream());
    // First test Connection message
    byte[] username = "foo".getBytes();
    ConnectMessage cm = new ConnectMessage(username.length, username);
    // The message need to get
    byte[] cntMsg = "Successfully Connected!".getBytes();
    ConnectResponse cr = new ConnectResponse(true, cntMsg.length, cntMsg);
    out.writeObject(cm);
    ConnectResponse m = (ConnectResponse) in.readObject();
    System.out.println(Arrays.toString(m.getMessage()));
  }

标签: javamultithreadingsocketsserverjunit4

解决方案


我已经解决了我自己的问题!

对于在多线程服务器上进行 JUnit 测试的任何人。这是我的建议:

您必须从一开始就启动主服务器,然后再进行其他任何操作。让你的主服务器监听你给它的一些端口。

然后你必须启动你的客户端,你必须给它你给主服务器的相同端口号

最后但同样重要的是,您可以启动线程来处理特定的客户。不知何故,如果我将我的线程实例化为 ClientThread foo,并且我调用了 foo.run(),它将无法工作。我必须实例化 Thread foo,并将我的 ClientThread() 作为 Thread() 的输入,然后调用 foo.start()!

现在它正在工作!


推荐阅读