首页 > 解决方案 > StreamCorruptedException after ~1000 transferred objects

问题描述

I want to transfer objects (AssignmentListener) from one Java Server to 5 Java Clients.

Therefore I wrote a method to send out the message:

    private void sendMessage(AssignmentListener listener, int[] subpartitionIndices){
        boolean success = false;
        int failCount = 0;

        // retry for the case of failure
        while(!success && failCount < 10) {
            try {
                // get the stored socket & stream if stored
                if(listener.getSocket() == null) {
                    if (localMode) {
                        listener.setSocket(new Socket("localhost", listener.getPort()));
                    } else {
                        listener.setSocket(new Socket(listener.getIp(), listener.getPort()));
                    }
                    listener.setOutputStream(new ObjectOutputStream(listener.getSocket().getOutputStream()));
                }

                AssignmentListenerMessage assignmentListenerMessage = new AssignmentListenerMessage(subpartitionIndices);
                System.out.println("Sending " + assignmentListenerMessage);

                listener.getOutputStream().writeObject(assignmentListenerMessage);
                listener.getOutputStream().flush();

                success = true;

            } catch (IOException se) {
                se.printStackTrace();
                System.err.println("Failed to forward " + Arrays.toString(subpartitionIndices) + " to " + listener);
                failCount++;
            }
        }
    }

On the client side, I have the following:

    public void run() {

        String mode = "remote";
        if(localMode) mode = "local";
        // we need to register this listener at at the OverpartitioningManager
        if(register(isLocalRequest)) System.out.println("Registered AssignmentListenerServer for index "+subpartitionIndex+" at ForwardingServer -  "+mode);

        running = true;
        while (running) {
            try {
                socket = serverSocket.accept();


                // Pass the socket to the RequestHandler thread for processing
                RequestHandler requestHandler = new RequestHandler( socket );
                requestHandler.start();

            } catch (SocketException se) {
                se.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }


        }
        try {
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }





    class RequestHandler extends Thread {
        private Socket socket;

        RequestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                System.out.println("Received a connection");

                // Get input and output streams
                inStream = new ObjectInputStream(socket.getInputStream());

                //outStream = new DataOutputStream(socket.getOutputStream());
                AssignmentListenerMessage incomingMessage = null;
                while(socket.isBound()) {
                    try {
                        incomingMessage = (AssignmentListenerMessage) inStream.readObject();
                    }catch (StreamCorruptedException sce){
                        System.out.println("Failed to read AssignmentMessage from Stream, but will try again... (no ack)");
                        sce.printStackTrace();
                        continue;
                    }

                        // do stuff with the message

                }

                // Close our connection
                inStream.close();
                socket.close();

                System.out.println("Connection closed");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

This works multiple times, but at one point I get the following exception:

java.io.StreamCorruptedException: invalid type code: 00

Does anyone have an idea or any other performance improvement for what I'm doing? Thanks.

标签: javasocketsnetworkingserializationobjectoutputstream

解决方案


推荐阅读