首页 > 解决方案 > Netty server closes second client connection

问题描述

Netty 4.1.35.Final

I want to make a persistant client-server connection with multiple clients connecting to a netty server and the server broadcasting this message to all connected clients.

When a first client connects to the server, it works and the message is handled. To maintain the connection, ping messages are sent by the client and the server receives these.

The second connected client can send its message, but then the first connection isn't working anymore and only ping messages from the second client are received.

I'm very desperate and any help is greatly appreciated!

Server:

package ch.zhaw.psit3.net.server;

import ch.zhaw.psit3.models.Document;
import ch.zhaw.psit3.models.Enums.ConnectionState;
import ch.zhaw.psit3.models.NetworkState;
import ch.zhaw.psit3.models.Session;
import ch.zhaw.psit3.net.common.MessageDecoder;
import ch.zhaw.psit3.net.common.MessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class Server implements IServer, Runnable {

    private final MessageBroadcaster broadcaster = new MessageBroadcaster();
    private final int port;
    private ServerListener listener;
    private Executor executor;
    private ServiceAdvertiser serviceAdvertiser;
    private final NetworkState networkState = new NetworkState();

    @Override
    public void start(Document document) throws IOException {
        /**
         * Start advertising with  https://github.com/jmdns/jmdns.
         */
        executor = Executors.newFixedThreadPool(1);
        executor.execute(this);
    }

    @Override
    public void stop() {

    }

    private void initializeState() {
        this.networkState.state = ConnectionState.NOT_CONNECTED;
        this.networkState.activeSession = new Session();
    }

    @Override
    public void setListener(ServerListener listener) {
        /*todo reevaluate if data field listener in this is needed*/
        this.listener = listener;
    }

    public static void main(String[] args) throws Exception {
        int port = args.length > 0 ? Integer.parseInt(args[0]) : 9090;
        Server server = new Server(port);
        server.start(new Document());
    }

    public Server() {
        this.port = 9090;
        this.serviceAdvertiser = new ServiceAdvertiser(this.port);
        initializeState();
    }

    public Server(final int port) {
        this.port = port;
        this.serviceAdvertiser = new ServiceAdvertiser(this.port);
        initializeState();
    }

    @Override
    public void run() {
        /**
         * This thread is meant to handle the ot process with one client.
         * It has its own ot instance running and sends changes from the client back to the server, which distributes it to all other serverConnections
         *
         */
        if (!(Thread.currentThread() instanceof IServerConnection || !(Thread.currentThread() instanceof Executor))) {
            throw new UnsupportedOperationException(Thread.currentThread().getName() + " thread is not allowed to call the run method");
        }

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast("messageDecoder", new MessageDecoder());
                            ch.pipeline().addLast("messageEncoder", new MessageEncoder());
                            ch.pipeline().addLast("serverHandler", new ServerHandler(listener, broadcaster));
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException ie) {
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

ServerHandler:

package ch.zhaw.psit3.net.server;

import ch.zhaw.psit3.models.Message;
import io.netty.channel.*;

import java.util.Objects;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    private final MessageBroadcaster broadcaster;
    private ServerListener serverListener;

    public ServerHandler(ServerListener serverListener, MessageBroadcaster broadcaster) {
        this.serverListener = serverListener;
        this.broadcaster = broadcaster;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object object)
            throws Exception {
        Message message = (Message) object;
        System.out.println(message);
        if (Objects.nonNull(serverListener))
            serverListener.newMessage(message);
        message.setText(message.getText() + " returned from the server to client");
        broadcaster.newMessageToBroadcast(message, ctx.channel());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        broadcaster.addObserver(ctx.channel());
        super.channelActive(ctx);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        broadcaster.removeObserver(ctx.channel());
        super.channelInactive(ctx);
    }
}

MessageBroadcaster:

package ch.zhaw.psit3.net.server;

import ch.zhaw.psit3.models.Message;
import io.netty.channel.*;

import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Set;

public class MessageBroadcaster {
    private Set<Channel> channels = Collections.synchronizedSet(new LinkedHashSet<>());

    public void addObserver(Channel channel) {
        channels.add(channel);
    }

    public void removeObserver(Channel channel) {
        channels.remove(channel);
    }

    public void newMessageToBroadcast(Message message, Channel originChannel) throws ChannelException {
        synchronized (channels) {
            for (Channel channel : channels) {
                if (!channel.equals(originChannel)) {
                    if (!channel.isActive()) {
                        System.out.println("channel not writable");
                    } else {
                        ChannelFuture future = channel.writeAndFlush(message);
                        future.addListener((ChannelFutureListener) future1 -> {
                            if (!future1.isSuccess()) {
                                throw new ChannelException("sending of message wasn't successful: ", future1.cause());
                            }
                        });
                    }
                }
            }
        }
    }
}

Client:

package ch.zhaw.psit3.net.client;

import ch.zhaw.psit3.models.Enums.ConnectionState;
import ch.zhaw.psit3.models.Enums.OperationType;
import ch.zhaw.psit3.models.NetworkState;
import ch.zhaw.psit3.models.Session;
import ch.zhaw.psit3.models.Message;
import ch.zhaw.psit3.net.common.MessageDecoder;
import ch.zhaw.psit3.net.common.MessageEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.ArrayList;
import java.util.concurrent.*;

public class Client implements IClient, Runnable {

    private ClientListener listener = null;
    private NetworkState networkState = new NetworkState();
    private final ClientHandler clientHandler = new ClientHandler(listener);
    private ExecutorService executor = null;

    /*TODO remove after testing core functionality*/
    public static void main(String[] args) {
        int port = args.length > 0 ? Integer.parseInt(args[0]) : 9090;
        Client client = new Client();
        client.networkState.activeSession.port = port;
        client.startClient();
        try {
            Thread.sleep(6000);
            for (int i = 3; i < 4; i++) {
                var msg = new Message();
                msg.setText("This is a test text :" + i);
                msg.setRange(i);
                msg.setTheirOperations(i * 2);
                msg.setMyOperations(i * 3);
                msg.setMyOperations(i * 4);
                msg.setOperationType(OperationType.ADD);
                client.sendMessage(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Client() {
        setInitialState();
    }

    private void setInitialState() {
        networkState.state = ConnectionState.NOT_CONNECTED;
        networkState.activeSession = new Session();
        networkState.sessions = new ArrayList<>();
        networkState.activeSession.ip = "127.0.0.1";
        networkState.activeSession.port = 9090;
    }


    private synchronized void startClient() {
        if (!(ConnectionState.CONNECTED_TO_SESSION == this.networkState.state)) {
            executor = Executors.newFixedThreadPool(1);
            executor.execute(this);
            this.networkState.state = ConnectionState.CONNECTED_TO_SESSION;
        }
    }

    @Override
    public void run() {
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("idleStateHandler", new IdleStateHandler(30, 30, 0));
                    ch.pipeline().addLast("messageEncoder", new MessageEncoder());
                    ch.pipeline().addLast("messageDecoder", new MessageDecoder());
                    ch.pipeline().addLast("clientHandler", clientHandler);
                }
            });

            ChannelFuture future = bootstrap.connect(networkState.activeSession.ip, networkState.activeSession.port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

ClientHandler:

package ch.zhaw.psit3.net.client;

import ch.zhaw.psit3.models.Message;
import ch.zhaw.psit3.net.common.PingMessage;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.Objects;

public class ClientHandler extends ChannelInboundHandlerAdapter {
    private ChannelHandlerContext context;
    private ClientListener listener;

    public ClientHandler(ClientListener listener) {
        this.listener = listener;
    }

    void addListener(ClientListener listener) {
        this.listener = listener;
    }

    public void sendMessage(Message message) throws Exception {
        if (context != null) {
            System.out.println(message);
            ChannelFuture future = context.writeAndFlush(message);
            future.addListener((ChannelFutureListener) future1 -> {
                if (!future1.isSuccess()) {
                    throw new ChannelException("sending of message wasn't successful: ", future1.cause());
                }
            });
        } else {
            throw new ChannelException("Channel is not ready yet");
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {
                ctx.writeAndFlush(new PingMessage());
            } else if (e.state() == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(new PingMessage());
            }
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext context, Object object) throws Exception {
        Message message = (Message) object;
        /* TODO remove sysout */
        System.out.println(message);
        if (Objects.nonNull(listener))
            listener.newMessage(message);
        context.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext context) throws Exception {
        this.context = context;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
        /* todo log and handle exceptions */
        cause.printStackTrace();
        context.close();
    }

    @Override
    public void channelInactive(ChannelHandlerContext context) throws Exception {
    }
}

标签: javaconcurrencynetty

解决方案


我想到了。我在 ClientHandler 的 channelRead() 中有示例的 context.close(),因此一旦收到消息,通道就会关闭。


推荐阅读