首页 > 技术文章 > netty websocket协议开发

huzi007 2016-07-05 08:21 原文

websocket的好处我们就不用多说了,就是用于解决长连接、服务推送等需要的一种技术。

 

以下我们来看一个例子:

 1 package com.ming.netty.http.websocket;
 2 
 3 import java.net.InetSocketAddress;
 4 
 5 import io.netty.bootstrap.ServerBootstrap;
 6 import io.netty.channel.ChannelFuture;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelOption;
 9 import io.netty.channel.ChannelPipeline;
10 import io.netty.channel.EventLoopGroup;
11 import io.netty.channel.nio.NioEventLoopGroup;
12 import io.netty.channel.socket.SocketChannel;
13 import io.netty.channel.socket.nio.NioServerSocketChannel;
14 import io.netty.handler.codec.http.HttpObjectAggregator;
15 import io.netty.handler.codec.http.HttpServerCodec;
16 import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
17 import io.netty.handler.stream.ChunkedWriteHandler;
18 
19 public class WebSocketServer {
20 
21     
22     
23     public static void main(String[] args) {
24         new WebSocketServer().run("127.0.0.1", 8500);
25     }
26     
27     
28     public void run(String addr,int port){
29         EventLoopGroup bossGroup = new NioEventLoopGroup();
30         EventLoopGroup workerGroup = new NioEventLoopGroup();
31         try {
32              ServerBootstrap b=new ServerBootstrap();
33               b.group(bossGroup, workerGroup)
34              .channel(NioServerSocketChannel.class)
35              .option(ChannelOption.SO_BACKLOG, 128)
36              .childOption(ChannelOption.SO_KEEPALIVE, true)
37              .childHandler(new WebSocketServerHandlerInitializer()); 
38               
39               ChannelFuture f=b.bind(new InetSocketAddress(addr, port)).sync();
40               System.out.println("启动服务器:"+f.channel().localAddress());
41               //等等服务器端监听端口关闭
42               f.channel().closeFuture().sync();
43               
44         } catch (Exception e) {
45             e.printStackTrace();
46         }finally{
47             bossGroup.shutdownGracefully();
48             workerGroup.shutdownGracefully();
49         }
50     }
51     
52     protected class WebSocketServerHandlerInitializer extends ChannelInitializer<SocketChannel>{
53         
54         @Override
55         protected void initChannel(SocketChannel ch) throws Exception {
56             ChannelPipeline pipeline = ch.pipeline(); 
57             pipeline.addLast(new HttpServerCodec());
58             pipeline.addLast(new HttpObjectAggregator(64*1024));
59             pipeline.addLast(new ChunkedWriteHandler());
60             pipeline.addLast(new HttpRequestHandler("/ws"));
61             pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
62             pipeline.addLast(new TextWebSocketFrameHandler());
63         }
64         
65     }
66 }
  1 package com.ming.netty.http.websocket;
  2 
  3 import io.netty.channel.Channel;
  4 import io.netty.channel.ChannelFuture;
  5 import io.netty.channel.ChannelFutureListener;
  6 import io.netty.channel.ChannelHandlerContext;
  7 import io.netty.channel.DefaultFileRegion;
  8 import io.netty.channel.SimpleChannelInboundHandler;
  9 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 10 import io.netty.handler.codec.http.DefaultHttpResponse;
 11 import io.netty.handler.codec.http.FullHttpRequest;
 12 import io.netty.handler.codec.http.FullHttpResponse;
 13 import io.netty.handler.codec.http.HttpHeaders;
 14 import io.netty.handler.codec.http.HttpResponse;
 15 import io.netty.handler.codec.http.HttpResponseStatus;
 16 import io.netty.handler.codec.http.HttpVersion;
 17 import io.netty.handler.codec.http.LastHttpContent;
 18 import io.netty.handler.ssl.SslHandler;
 19 import io.netty.handler.stream.ChunkedNioFile;
 20 
 21 import java.io.File;
 22 import java.io.IOException;
 23 import java.io.RandomAccessFile;
 24 import java.net.URISyntaxException;
 25 import java.net.URL;
 26 
 27 public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
 28 
 29     private final String wsUri;
 30     private static final File INDEX;
 31  
 32     // static HTTP request handling operation.
 33     static {
 34         URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
 35         try {
 36            String path = location.toURI() + "WebSocketClient.html";
 37            path = !path.contains("file:") ? path : path.substring(5);
 38            INDEX = new File("D:\\javaPro\\workspace\\NettyTest\\resources\\WebsocketChatClient.html");
 39          } catch (URISyntaxException e) {
 40            throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);
 41         }
 42   }
 43  
 44 
 45      public HttpRequestHandler(String wsUri) {
 46          this.wsUri = wsUri;
 47      }
 48 
 49  
 50  
 51  
 52  @Override
 53 protected void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
 54      channelRead(ctx,request);
 55     
 56 }
 57 
 58 
 59 
 60 
 61 public void channelRead(ChannelHandlerContext ctx, FullHttpRequest request)
 62    throws Exception {
 63   if (wsUri.equalsIgnoreCase(request.getUri())) {
 64    ctx.fireChannelRead(request.retain());
 65   } else {
 66    if (HttpHeaders.is100ContinueExpected(request)) {
 67     send100Continue(ctx);
 68    }
 69    
 70    RandomAccessFile file = new RandomAccessFile(INDEX, "r");
 71    HttpResponse response = new DefaultHttpResponse(
 72      request.getProtocolVersion(), HttpResponseStatus.OK);
 73      response.headers().set(HttpHeaders.Names.CONTENT_TYPE,
 74      "text/html; charset=UTF-8");
 75 
 76    boolean keepAlive = HttpHeaders.isKeepAlive(request);
 77    if (keepAlive) {
 78     response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,  file.length());
 79     response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
 80    }
 81    ctx.write(response);
 82 
 83    if (ctx.pipeline().get(SslHandler.class) == null) {
 84     ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
 85    } else {
 86     ctx.write(new ChunkedNioFile(file.getChannel()));
 87    }
 88    ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
 89    if (!keepAlive) {
 90     future.addListener(ChannelFutureListener.CLOSE);
 91    }
 92 
 93    file.close();
 94   }
 95  }
 96 
 97  private static void send100Continue(ChannelHandlerContext ctx) {
 98   FullHttpResponse response = new DefaultFullHttpResponse(
 99     HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
100   ctx.writeAndFlush(response);
101  }
102 
103  @Override
104  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
105    throws Exception {
106   Channel incoming = ctx.channel();
107   System.out.println("Client:" + incoming.remoteAddress() + "异常");
108   
109   // 当出现异常就关闭连接
110   cause.printStackTrace();
111   ctx.close();
112  }
113 
114 }
 1 package com.ming.netty.http.websocket;
 2 
 3 import io.netty.channel.Channel;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.channel.SimpleChannelInboundHandler;
 6 import io.netty.channel.group.ChannelGroup;
 7 import io.netty.channel.group.DefaultChannelGroup;
 8 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 9 import io.netty.util.concurrent.GlobalEventExecutor;
10 
11 public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>  {
12 
13     public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
14     
15     
16     
17     @Override
18     public void channelActive(ChannelHandlerContext ctx) throws Exception {
19           Channel incoming = ctx.channel();
20           System.out.println("Client:" + incoming.remoteAddress() + "在线");
21     }
22 
23 
24 
25     @Override
26     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
27         Channel incoming = ctx.channel();
28         System.out.println("Client:" + incoming.remoteAddress() + "掉线");
29     }
30 
31 
32     @Override
33     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
34           Channel incoming = ctx.channel();
35           for (Channel channel : channels) {
36            channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - "  + incoming.remoteAddress() + " 加入"));
37           }
38           channels.add(ctx.channel());
39           System.out.println("Client:" + incoming.remoteAddress() + "加入");
40     }
41 
42 
43 
44     @Override
45     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
46         Channel incoming = ctx.channel();
47         for (Channel channel : channels) {
48           channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - "  + incoming.remoteAddress() + " 离开"));
49         }
50         System.out.println("Client:" + incoming.remoteAddress() + "离开");
51         channels.remove(ctx.channel());
52     }
53 
54     
55     
56     
57 
58 
59     @Override
60     protected void messageReceived(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
61         Channel incoming = ctx.channel();
62           for (Channel channel : channels) {
63            if (channel != incoming) {
64             //channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));
65            } else {
66             //返送给指定的
67             channel.writeAndFlush(new TextWebSocketFrame("[服务器端返回]:" + msg.text()));
68             
69         
70             //output current message to context. 
71             StringBuffer sb = new StringBuffer();
72             sb.append(incoming.remoteAddress()).append("->").append(msg.text());
73             System.out.println(sb.toString());
74            }
75           }
76         
77     }
78 
79 
80 
81     @Override
82     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
83         Channel incoming = ctx.channel();
84         System.out.println("Client:" + incoming.remoteAddress() + "异常");
85         // 当出现异常就关闭连接
86         cause.printStackTrace();
87         ctx.close();
88     }
89 
90 
91 
92     
93 
94     
95 
96 }
 1 <!DOCTYPE html>
 2 <html>
 3 <head>
 4 <meta charset="UTF-8">
 5 <title>WebSocket Chat</title>
 6 </head>
 7 <body>
 8     <script type="text/javascript">
 9         var socket;
10         if (!window.WebSocket) {
11             window.WebSocket = window.MozWebSocket;
12         }
13         if (window.WebSocket) {
14             socket = new WebSocket("ws://127.0.0.1:8500/ws");
15             socket.onmessage = function(event) {
16                 var ta = document.getElementById('responseText');
17                 ta.value = ta.value + '\n' + event.data
18             };
19             socket.onopen = function(event) {
20                 var ta = document.getElementById('responseText');
21                 ta.value = "连接开启!";
22             };
23             socket.onclose = function(event) {
24                 var ta = document.getElementById('responseText');
25                 ta.value = ta.value + "连接被关闭";
26             };
27         } else {
28             alert("你的浏览器不支持 WebSocket!");
29         }
30 
31         function send(message) {
32             if (!window.WebSocket) {
33                 return;
34             }
35             if (socket.readyState == WebSocket.OPEN) {
36                 socket.send(message);
37             } else {
38                 alert("连接没有开启.");
39             }
40         }
41     </script>
42     <form onsubmit="return false;">
43         <h3>WebSocket 聊天室:</h3>
44         <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>
45         <br> 
46         <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com">
47         <input type="button" value="发送消息" onclick="send(this.form.message.value)">
48         <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">
49     </form>
50     <br> 
51     <br> 
52     
53 </body>
54 </html>

 

运行服务器,然后在浏览器输入:127.0.0.1:8500 就可以看见一个简单的聊天室效果了.

 

推荐阅读