首页 > 解决方案 > 基于套接字的应用程序的非确定性行为

问题描述

我正在开发客户端-服务器应用程序,但遇到了一个问题。我ServerWorker知道它负责一个连接的客户端,它创建 2 个线程,1 个用于侦听来自该客户端的传入数据,1 个用于向他发送数据。

class ServerWorker {
    private DataProcessor dataProcessor;
    private ObjectInputStream inputStream;
    private ObjectOutputStream outputStream;
    private Thread receiverThread;
    private Thread senderThread;
    private Optional<DataPacket> dataToSend;

    private ServerWorker(Socket socket) {
        try {
            dataToSend = Optional.empty();
            dataProcessor = new DataProcessor();
            receiverThread = new Thread(this::readAndProcessData);
            senderThread = new Thread(this::sendData);
            inputStream = new ObjectInputStream(socket.getInputStream());
            outputStream = new ObjectOutputStream(socket.getOutputStream());
        } catch (IOException e) {
            //TODO
        }
    }

    static ServerWorker create(Socket socket) {
        return new ServerWorker(socket);
    }

    void start() {
        receiverThread.start();
        senderThread.start();
    }

    void stop() {
        receiverThread.interrupt();
        senderThread.interrupt();
    }

    private void readAndProcessData() {
        DataPacket dataPacket;
        try {
            while((dataPacket = (DataPacket)inputStream.readObject()) != null) {
                System.out.println("incoming message: " + dataPacket.getContent());
                dataToSend = Optional.of(dataProcessor.process(dataPacket));
            }
        } catch (ClassNotFoundException | IOException e) {
            //TODO
        }
    }

    private void sendData() {
        while(true) { //TODO
            dataToSend.ifPresent(data -> {
                try {
                    outputStream.writeObject(data);
                    outputStream.flush();
                    dataToSend = Optional.empty();
                } catch (IOException e) {
                    //TODO
                }
            });
        }
    }
}

现在DataProcessor只是一个小班

public class DataProcessor {
   public DataPacket process(DataPacket packet){
        packet.setContent(packet.getContent().toUpperCase());
        return packet;
   }
}

当然,DataPacket这对于客户端和服务器都是一样的

public class DataPacket implements Serializable {
    private String content;

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

DataProcessor现在DataPackets只是一种 POC,这将成长为更大更复杂的类,长话短说,ServerWorker将接收数据并将其传递给进程,然后在完成一些逻辑后,返回的数据将存储在dataToSend变量中并在发送后删除. 问题是,我在上面发布的代码仅在某些时候有效。当我运行我的服务器应用程序和客户端一(下面的代码)时,90% 的时间没有任何反应,大写的“hello world”不会返回到客户端。有趣的是,当我在调试模式下运行我的服务器时(即使没有任何断点!),它也可以工作......知道到底出了什么问题吗?

 public static void main(String... args) throws Exception {
        Socket socket = new Socket("localhost", 9999);
        DataPacket dataPacket = new DataPacket();
        dataPacket.setContent("hello world");
        ObjectOutputStream os = new ObjectOutputStream(socket.getOutputStream());
        os.writeObject(dataPacket);
        os.flush();

        ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
        while((dataPacket = (DataPacket)inputStream.readObject()) != null) {
            System.out.println(dataPacket.getContent());
        }
    }

edit# 再添加一个类,ConnectionDispatcher负责创建ServerWorker对象

class ConnectionDispatcher implements Runnable {
    private ServerSocket serverSocket;
    private List<ServerWorker> serverWorkers;
    private volatile boolean isReceiving;

    private ConnectionDispatcher(int port) throws IOException {
        serverSocket = new ServerSocket(port);
        serverWorkers = new ArrayList<>();
        isReceiving = false;
    }

    static ConnectionDispatcher create(int port) throws IOException {
       return new ConnectionDispatcher(port);
    }

    @Override
    public void run() {
        isReceiving = true;
        while(isReceiving) {
           acceptIncomingConnections();
        }
    }

    private void acceptIncomingConnections() {
        try {
            ServerWorker worker = ServerWorker.create(serverSocket.accept());
            serverWorkers.add(worker);
            worker.start();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

标签: javamultithreadingsocketsnetworking

解决方案


似乎您正在dataToSend从一个线程进行修改,同时在另一个线程中读取它的值。这不是线程安全的,正在读取其值的线程可能永远不会看到其他线程设置的更新值。出于这个原因,我将声明dataToSendvolatile.

private volatile Optional<DataPacket> dataToSend;

我还没有机会亲自测试一下,但我可以在大约一个小时内完成(假设此更改不能解决您的问题)。


推荐阅读