Springboot + Netty + Websocket实现聊天(弹幕功能)

netty服务启动

public class WebsocketServer {

    private int port;

    public WebsocketServer(int port) {
        this.port = port;
    }

    public  void startNetty() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(2); // (1) 接收进来的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(3); // 处理已经被接收的连接
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2) 启动 NIO 服务的辅助启动类
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3) 一个新的 Channel 如何接收进来的连接
                    // .childHandler(new WebsocketDanmuServerInitializer())  // (4)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("http-decodec", new HttpRequestDecoder());
                            pipeline.addLast("http-aggregator", new HttpObjectAggregator(65536));
                            pipeline.addLast("http-encodec", new HttpResponseEncoder());
                            pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                            pipeline.addLast("WebSocket-protocol", new WebSocketServerProtocolHandler("/ws"));
                            pipeline.addLast("WebSocket-request", new TextWebSocketFrameHandler());
                        }
                    })  // (4) 配置一个 Channel
                    .option(ChannelOption.SO_BACKLOG, 128) // (5) 提供给NioServerSocketChannel用来接收进来的连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) 配置参数

            System.out.println("websocketServer 启动了" + port);

            // 绑定端口,开始接收进来的连接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服务器  socket 关闭 。
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
            System.out.println("websocketServer 关闭了");
        }

    }
}

收到消息的处理

/**
 * 处理TextWebSocketFrame
 *
 */
public class TextWebSocketFrameHandler extends
        SimpleChannelInboundHandler<TextWebSocketFrame> {
   
   public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

   // 。每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()
   @Override
   protected void channelRead0(ChannelHandlerContext ctx,
                                TextWebSocketFrame msg) throws Exception { // (1)
      Channel incoming = ctx.channel();
      for (Channel channel : channels) {
            if (channel != incoming){
            channel.writeAndFlush(new TextWebSocketFrame(msg.text()));
         } else {
            channel.writeAndFlush(new TextWebSocketFrame("我发送的"+msg.text()));
         }
        }
   }

   // 每当从服务端收到新的客户端连接时,客户端的 Channel 存入ChannelGroup列表中,并通知列表中的其他客户端 Channel
   @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)
        Channel incoming = ctx.channel();
        
        // 向多个管道广播消息
        channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));
        
        channels.add(incoming);
      System.out.println("Client:"+incoming.remoteAddress() +"加入");
    }

    // 每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)
        Channel incoming = ctx.channel();
        
        // 向多个管道广播消息
        channels.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));
        
      System.err.println("Client:"+incoming.remoteAddress() +"离开");

        // A closed Channel is automatically removed from ChannelGroup,
        // so there is no need to do "channels.remove(ctx.channel());"
    }

    // 客户端连接时候会被channelActive监听到
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
        Channel incoming = ctx.channel();
      System.out.println("Client:"+incoming.remoteAddress()+"在线");
   }

   // 客户端断开时会被channelInactive监听到
   @Override
   public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
        Channel incoming = ctx.channel();
      System.err.println("Client:"+incoming.remoteAddress()+"掉线");
   }

   // 异常监听 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)    // (7)
         throws Exception {
       Channel incoming = ctx.channel();
      System.err.println("Client:"+incoming.remoteAddress()+"异常");
            // 当出现异常就关闭连接
            cause.printStackTrace();
        ctx.close();
   }

}

源码地址:https://github.com/zhi-re/nettyproject

最后修改:2020 年 06 月 03 日 03 : 55 PM