首页 > 技术文章 > Netty 之 自定义协议

vettel0329 2020-03-19 11:36 原文

1.导入maven依赖

  <dependencies>

    ......

    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.39.Final</version>
    </dependency>
  </dependencies>

 

 

2.服务端部分

  a.创建服务端启动类

public class NettyServer {

    //单例
    private static NettyServer instance = new NettyServer();
    private NettyServer() {}
    public static NettyServer getInstance(){
        return instance;
    }

    //分包符
    public static final String DELIMITER = "MyProtocol/1.0";
    //协议一
    public static final int PROTOCOL_NO_1 = 1;
    //协议二
    public static final int PROTOCOL_NO_2 = 2;

    public void start(int port) throws Exception {
        //负责接收客户端的连接的线程。线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //负责处理数据传输的工作线程。线程数默认为CPU核心数乘以2
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup);
            bootstrap.channel(NioServerSocketChannel.class);
            //在ServerChannelInitializer中初始化ChannelPipeline责任链,并添加到serverBootstrap中
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) {
                    //分包工具类
                    channel.pipeline().addLast("delimiterDecoder", new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(DELIMITER.getBytes())));
                    //自定义编码解码器
                    channel.pipeline().addLast("myDecoder", new MyDecoder());
                    channel.pipeline().addLast("myEncoder", new MyEncoder());
                    //自定义Handler
                    channel.pipeline().addLast("protocolHandler", new ProtocolHandler());
                }
            });
            //标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            //是否启用心跳保活机制
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

            //绑定端口后,开启监听
            ChannelFuture future = bootstrap.bind(port).sync();
            future.addListener(f -> {
                if (f.isSuccess()) {
                    System.out.println("服务启动成功");
                } else {
                    System.out.println("服务启动失败");
                }
            });
            //等待服务监听端口关闭
            future.channel().closeFuture().sync();
        } finally {
            //释放资源
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) {
        try {
            NettyServer.getInstance().start(8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 

  b.创建自定义协议VO

public class MyProtocol {

    private int protocolNo;
    private int contentLength;
    private byte[] content;

    public MyProtocol(int protocolNo, byte[] content) {
        this.protocolNo = protocolNo;
        this.content = content;
        this.contentLength = content.length;
    }

    public MyProtocol(int protocolNo, int contentLength, byte[] content) {
        this.protocolNo = protocolNo;
        this.contentLength = contentLength;
        this.content = content;
    }

    public int getProtocolNo() {
        return protocolNo;
    }

    public void setProtocolNo(int protocolNo) {
        this.protocolNo = protocolNo;
    }

    public int getContentLength() {
        return contentLength;
    }

    public void setContentLength(int contentLength) {
        this.contentLength = contentLength;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }

}

 

  c.创建自定义协议解码器

public class MyDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int protocolNo = byteBuf.readInt();
        int contentLength = byteBuf.readInt();
        byte[] content = new byte[contentLength];
        byteBuf.readBytes(content);

        MyProtocol protocol = new MyProtocol(protocolNo, contentLength, content);
        list.add(protocol);
    }
}

 

  d.创建自定义协议编码器

public class MyEncoder extends MessageToByteEncoder<MyProtocol> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MyProtocol myProtocol, ByteBuf byteBuf) throws Exception {
        byteBuf.writeInt(myProtocol.getProtocolNo());
        byteBuf.writeInt(myProtocol.getContentLength());
        byteBuf.writeBytes(myProtocol.getContent());
        //分隔符
        byteBuf.writeBytes(NettyServer.DELIMITER.getBytes());
    }
}

 

  e.创建自定义协议处理器

public class ProtocolHandler extends SimpleChannelInboundHandler<MyProtocol> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyProtocol myProtocol) throws Exception {
        Channel channel = ctx.channel();
        PoolManager.getInstance().addTask(new BaseTask() {
            @Override
            public void doWork() {
                try {
                    //处理协议
                    ProtocolManager.getInstance().handleProtocol(myProtocol);
                    //发送消息
                    if(myProtocol.getProtocolNo() == NettyServer.PROTOCOL_NO_1){
                        MyProtocol protocol2 = ProtocolManager.getInstance().getProtocol2((byte) 13, 'z', true);
                        channel.write(protocol2);
                    }else{
                        MyProtocol protocol1 = ProtocolManager.getInstance().getProtocol1("Protocol_1", 11, 11.5f);
                        channel.write(protocol1);
                    }
                    channel.flush();
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
        });

    }

}

 

  f.创建协议管理工具

public class ProtocolManager {

    //单例
    private static ProtocolManager instance = new ProtocolManager();
    private ProtocolManager() {}
    public static ProtocolManager getInstance(){
        return instance;
    }

    public void handleProtocol(MyProtocol myProtocol) throws Exception {
        int protocolNo = myProtocol.getProtocolNo();
        byte[] content = myProtocol.getContent();
        DataInputStream is = new DataInputStream(new ByteArrayInputStream(content));
        if(protocolNo == NettyServer.PROTOCOL_NO_1){
            String param1 = is.readUTF();
            int param2 = is.readInt();
            float param3 = is.readFloat();
            System.out.println("接收数据:String[" + param1 + "], int[" + param2 + "], float[" + param3 + "]");
        }else if(protocolNo == NettyServer.PROTOCOL_NO_2){
            byte param1 = is.readByte();
            char param2 = is.readChar();
            boolean param3 = is.readBoolean();
            System.out.println("接收数据:byte[" + param1 + "], char[" + param2 + "], boolean[" + param3 + "]");
        }
    }

    public MyProtocol getProtocol1(String param1, int param2, float param3) throws Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        dos.writeUTF(param1);
        dos.writeInt(param2);
        dos.writeFloat(param3);
        byte[] content = bos.toByteArray();
        return new MyProtocol(NettyServer.PROTOCOL_NO_1, content);
    }

    public MyProtocol getProtocol2(byte param1, char param2, boolean param3) throws Exception {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        dos.writeByte(param1);
        dos.writeChar(param2);
        dos.writeBoolean(param3);
        byte[] content = bos.toByteArray();
        return new MyProtocol(NettyServer.PROTOCOL_NO_2, content);
    }

}

 

 

  3.客户端部分

    a.创建客户端启动类

public class NettyClient {

    //单例
    private static NettyClient instance = new NettyClient();
    private NettyClient() {}
    public static NettyClient getInstance(){
        return instance;
    }

    public void connect(String ip, int port) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel channel) {
                    //分包工具
                    channel.pipeline().addLast("delimiterDecoder", new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(NettyServer.DELIMITER.getBytes())));
                    //自定义编码解码器
                    channel.pipeline().addLast("myDecoder", new MyDecoder());
                    channel.pipeline().addLast("myEncoder", new MyEncoder());
                    //自定义Handler
                    channel.pipeline().addLast("protocolHandler", new ClientProtocolHandler());
                }
            });
            bootstrap.option(ChannelOption.TCP_NODELAY, true);

            //绑定端口后,开启监听
            ChannelFuture future = bootstrap.connect(ip, port).sync();
            future.addListener(f -> {
                if (f.isSuccess()) {
                    System.out.println("客户端连接成功");
                } else {
                    System.out.println("客户端连接失败");
                }
            });
            //等待服务监听端口关闭
            future.channel().closeFuture().sync();
        } finally {
            //释放资源
            group.shutdownGracefully();
        }
    }


    public static void main(String[] args) {
        try {
            NettyClient.getInstance().connect("127.0.0.1", 8080);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

 

  b.创建自定义协议处理类

public class ClientProtocolHandler extends SimpleChannelInboundHandler<MyProtocol> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        MyProtocol protocol = ProtocolManager.getInstance().getProtocol1("Hello World", 10, 12.6f);
        ctx.channel().writeAndFlush(protocol);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyProtocol myProtocol) throws Exception {
        Channel channel = ctx.channel();
        PoolManager.getInstance().addTask(new BaseTask() {
            @Override
            public void doWork() {
                try {
                    //处理协议
                    ProtocolManager.getInstance().handleProtocol(myProtocol);

                    //TODO 休眠5秒
                    Thread.sleep(5 * 1000);

                    //发送消息
                    if(myProtocol.getProtocolNo() == NettyServer.PROTOCOL_NO_1){
                        MyProtocol protocol1 = ProtocolManager.getInstance().getProtocol1("Protocol_1", 1, 1.5f);
                        channel.write(protocol1);

                        MyProtocol protocol2 = ProtocolManager.getInstance().getProtocol2((byte) 2, 'b', true);
                        channel.write(protocol2);
                    }else{
                        MyProtocol protocol2 = ProtocolManager.getInstance().getProtocol2((byte) 3, 'c', true);
                        channel.write(protocol2);
                    }
                    channel.flush();
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
        });
    }

}

 

 

4.线程池部分

  a.创建任务VO

public abstract class BaseTask {

    public void doBeforeWork(){}
    public abstract void doWork();
    public void doAfterWork(){}

}

 

  b.创建线程池管理类

public class PoolManager {

    //单例
    private static PoolManager instance = new PoolManager();
    public static PoolManager getInstance(){
        return instance;
    }

    private ExecutorService pool;
    private BlockingQueue queue;
    private int corePoolSize = 4;
    private int maxPoolSize = 8;
    private long keepAliveTime = 10 * 1000;

    private PoolManager() {
        queue = new LinkedBlockingQueue();
        pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, queue);
    }

    public void addTask(BaseTask task){
        pool.execute(() -> {
            task.doBeforeWork();
            task.doWork();
            task.doAfterWork();
        });
    }

}

 

推荐阅读