java - 如何测试多线程服务器
问题描述
我有一个主线程服务器,它基本上听任何想连接到端口的人
/**
* The main server thread receives request from and sends
* response to clients.
*/
public class Server {
/*
Port number for the client connection
*/
private static final int PORT = 3000;
/*
The number of client can be connected
*/
private static final int SIZE = 10;
/*
The executor
*/
private static ExecutorService executorService = Executors.newFixedThreadPool(SIZE);
/**
* Starts the main server.
*/
public static void main(String[] args) {
/*
All the information are stored into the queue
*/
BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
/*
All the clients are stored into the map
*/
ConcurrentMap<byte[], Boolean> clientManagement = new ConcurrentHashMap<>();
runMainServer(messageQueue, clientManagement);
}
private static void runMainServer(BlockingQueue<Message> messageQueue, ConcurrentMap<byte[], Boolean> clientManagement) {
try (
ServerSocket serverSocket = new ServerSocket(PORT);
) {
System.out.println("Starting server");
while (true) {
System.out.println("Waiting for request");
Socket socket = serverSocket.accept();
System.out.println("Processing request");
ClientThread newClient = new ClientThread(socket, messageQueue, clientManagement);
executorService.submit(newClient);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
而且我有许多多线程子服务器来处理每个相同的客户端。客户端首先将被服务器接受,并检查服务器收到的第一条消息是否是 connect_message。如果是,则它们已正式连接。除了connect_message,还有更多的消息。但我只是不会对它们过于具体。
/**
* The client thread.
*/
public class ClientThread implements Runnable {
private Socket socket;
private BlockingQueue<Message> messageQueue;
private ConcurrentMap<byte[], Boolean> clientManagement;
private byte[] username;
/**
*
*
* @param socket
* @param messageQueue
* @param clientManagement
*/
public ClientThread(Socket socket, BlockingQueue<Message> messageQueue, ConcurrentMap<byte[], Boolean> clientManagement) {
this.socket = socket;
this.messageQueue = messageQueue;
this.clientManagement = clientManagement;
this.username = new byte[1];
}
/**
*
*/
@Override
public void run() {
try (
ObjectInputStream in = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream());
) {
Message m = (Message) in.readObject();
if (m.getIdentifier() == MessageIdentifier.CONNECT_MESSAGE) {
ConnectMessage cm = (ConnectMessage) m;
this.username = cm.getUsername();
clientManagement.put(cm.getUsername(), true);
byte[] cntMsg = "Successfully Connected!".getBytes();
ConnectResponse cr = new ConnectResponse(true, cntMsg.length, cntMsg);
out.writeObject(cr);
} else {
// Handle failing request
handleFailedMsg(out, "Client should connect first");
socket.close();
throw new IllegalArgumentException("Connect unsuccessfully");
}
handleClient(in, out);
socket.close();
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
/**
*
* @param in
* @param out
* @throws InterruptedException
* @throws IOException
* @throws ClassNotFoundException
*/
private void handleClient(ObjectInputStream in, ObjectOutputStream out)
throws InterruptedException, IOException, ClassNotFoundException {
while (true) {
// Handle message taken from the queue
Message msgFromQueue = messageQueue.take();
handleQueueRequest(msgFromQueue, out);
// Handle request obtained by user
Message request = (Message) in.readObject();
// Handle disconnect
if (request.getIdentifier() == MessageIdentifier.DISCONNECT_MESSAGE) {
DisconnectMessage dm = (DisconnectMessage) request;
// If the message is not for this thread, then put it back and ignore it.
if (!Arrays.equals(username, dm.getUsername())) {
messageQueue.add(request);
continue;
}
// Check if the username is inside the client map
if (!clientManagement.containsKey(dm.getUsername())) {
handleFailedMsg(out, "The client doesn't exist");
}
// Disconnect
clientManagement.remove(dm.getUsername());
// Create disconnect response
byte[] message = "See you again".getBytes();
DisconnectResponse dr = new DisconnectResponse(true, message.length, message);
// Write to the client
out.writeObject(dr);
break;
}
// Handle other
if (!handleRequest(request, out)) {
handleFailedMsg(out, "The request failed due to incorrect username.");
}
}
}
/**
*
* @param request
* @param out
* @return
* @throws IOException
*/
private boolean handleRequest(Message request, ObjectOutputStream out) throws IOException {
switch (request.getIdentifier()) {
// If broadcast, then every one should know
case BROADCAST_MESSAGE:
BroadcastMessage bm = (BroadcastMessage) request;
if (!Arrays.equals(username, bm.getUsername())) {
return false;
}
messageQueue.add(request);
break;
// If user want the list of connected users
case QUERY_CONNECTED_USERS:
QueryUsersMessage qu = (QueryUsersMessage) request;
if (!Arrays.equals(username, qu.getUsername())) {
return false;
}
List<Pair<Integer, byte[]>> userList = new ArrayList<>();
for (byte[] username : clientManagement.keySet()) {
Pair<Integer, byte[]> user = new Pair<>(username.length, username);
userList.add(user);
}
// Create a new query response containing all the users
QueryResponse qr = new QueryResponse(clientManagement.keySet().size(), userList);
out.writeObject(qr);
break;
// If user wants to send a direct message to the other user
case DIRECT_MESSAGE:
DirectMessage dm = (DirectMessage) request;
if (!Arrays.equals(username, dm.getUsername())) {
return false;
}
messageQueue.add(request);
break;
// If user wants to send an insult to the other user and broadcast to the chat room
case SEND_INSULT:
SendInsultMessage si = (SendInsultMessage) request;
if (!Arrays.equals(username, si.getUsername())) {
return false;
}
messageQueue.add(request);
break;
}
return true;
}
/**
*
* @param out
* @param description
* @throws IOException
*/
public void handleFailedMsg(ObjectOutputStream out, String description) throws IOException {
byte[] failedMsg = description.getBytes();
FailedMessage fm = new FailedMessage(failedMsg.length, failedMsg);
out.writeObject(fm);
}
/**
*
* @param request
* @param out
* @throws IOException
*/
public void handleQueueRequest(Message request, ObjectOutputStream out) throws IOException {
switch (request.getIdentifier()) {
case SEND_INSULT:
// Gets the message from queue
SendInsultMessage si = (SendInsultMessage) request;
// Check if the user already gotten the message
if (!si.getOtherUsers().contains(username)) {
out.writeObject(si);
si.addUsers(username);
}
// Check if all the users already gotten the message
if (si.getOtherUsers().size() < clientManagement.keySet().size()) {
messageQueue.add(si);
}
break;
case DIRECT_MESSAGE:
DirectMessage dm = (DirectMessage) request;
// Check if the message is for this user
if (Arrays.equals(username, dm.getRecipientUsername())) {
out.writeObject(dm);
} else { // If not for this user then put it back
messageQueue.add(dm);
}
break;
case BROADCAST_MESSAGE:
// Gets the message from queue
BroadcastMessage bm = (BroadcastMessage) request;
// Check if the user already gotten the message
if (!bm.getOtherUsers().contains(username)) {
out.writeObject(bm);
bm.addUsers(username);
}
// Check if all the users already gotten the message
if (bm.getOtherUsers().size() < clientManagement.keySet().size()) {
messageQueue.add(bm);
}
break;
}
}
我想为我的服务器做 JUnit 测试。测试这样的多线程服务器的最佳方法是什么?
这是我正在尝试的 JUnit 测试代码。我首先启动一个被服务器接受的线程。然后我将启动一个客户端并假装客户端正在向服务器发送一些东西。我首先想尝试一个 connect_message 来看看连接是如何工作的。但到目前为止,该测试似乎没有对 JUnit 测试做出响应。它只是继续运行,没有任何事情发生
public class ClientThreadTest {
private Thread foo;
private List<ClientThread> clientList;
private BlockingQueue<Message> messageQueue;
private ConcurrentMap<byte[], Boolean> clientManagement;
private static final int PORT = 3000;
@Before
public void setUp() throws Exception {
messageQueue = new LinkedBlockingQueue<>();
clientManagement = new ConcurrentHashMap<>();
}
@Test
public void run() throws IOException, ClassNotFoundException {
ServerSocket socket = new ServerSocket(PORT);
foo = new Thread(new ClientThread(socket.accept(), messageQueue, clientManagement));
foo.start();
Socket fooClient = new Socket("localhost", PORT);
ObjectOutputStream out = new ObjectOutputStream(fooClient.getOutputStream());
ObjectInputStream in = new ObjectInputStream(fooClient.getInputStream());
// First test Connection message
byte[] username = "foo".getBytes();
ConnectMessage cm = new ConnectMessage(username.length, username);
// The message need to get
byte[] cntMsg = "Successfully Connected!".getBytes();
ConnectResponse cr = new ConnectResponse(true, cntMsg.length, cntMsg);
out.writeObject(cm);
ConnectResponse m = (ConnectResponse) in.readObject();
System.out.println(Arrays.toString(m.getMessage()));
}
解决方案
我已经解决了我自己的问题!
对于在多线程服务器上进行 JUnit 测试的任何人。这是我的建议:
您必须从一开始就启动主服务器,然后再进行其他任何操作。让你的主服务器监听你给它的一些端口。
然后你必须启动你的客户端,你必须给它你给主服务器的相同端口号
最后但同样重要的是,您可以启动线程来处理特定的客户。不知何故,如果我将我的线程实例化为 ClientThread foo,并且我调用了 foo.run(),它将无法工作。我必须实例化 Thread foo,并将我的 ClientThread() 作为 Thread() 的输入,然后调用 foo.start()!
现在它正在工作!
推荐阅读
- windows - 如何更改 WSL 选项卡中的背景颜色
- docker - 在 Selenium docker 中 Chrome 下载失败
- typescript - TypeScript 交集减少为“从不”,因为在使用泛型时,属性“长度”在某些成分中具有冲突类型
- javascript - 如何从数组中拉出一个项目(这是一个对象),它是使用 Mongoose 的 MongoDB 集合中文档的属性?
- python - 为什么切片熊猫数据帧时会出现速度差异?
- swift - “某些视图”属性的快照问题
- jetstream - Laravel JetStream - 忘记密码转到 http 而不是 https
- python-3.x - 有没有办法等待多处理管道,直到它有信息?
- python - 当pygame说缺少必需的参数'dest'(pos 2)时,它是什么意思?
- pddl - PDDL 规划器无法识别类型