springboot+netty搭建网络编程websocket服务器

1、引入dependencie

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</depndency>

2、定义websocket服务类 WebSocketServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Component
public class WebSocketServer {
    private static final Logger logger= LoggerFactory.getLogger(wang.aweb.childcar.websocketserver.WebSocketServer.class);
    public void start(InetSocketAddress address){
        EventLoopGroup gatewayGroup=new NioEventLoopGroup(1);
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try{
            ServerBootstrap bootstrap=new ServerBootstrap()
                    .group(gatewayGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(address)
                    .childHandler(new WebSocketServerChannelInitializer())
                    .option(ChannelOption.SO_BACKLOG,128)
                    .childOption(ChannelOption.SO_KEEPALIVE,true);
            ChannelFuture future=bootstrap.bind(address).sync();
            logger.info("server start listen at "+address.getPort());
            future.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
            gatewayGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

3、初始化websocket服务 WebSocketServerChannelInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class WebSocketServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
        channel.pipeline().addLast(new HttpServerCodec());
        channel.pipeline().addLast(new HttpObjectAggregator(8192));
        channel.pipeline().addLast(new WebSocketServerProtocolHandler("/","Websocket",true,65536*10));
        channel.pipeline().addLast(new WebSocketServerHandler());
    }
}

4、具体业务逻辑 WebSocketServerHandler

import com.auth0.jwt.JWT;
import com.auth0.jwt.JWTVerifier;
import com.auth0.jwt.algorithms.Algorithm;
import com.auth0.jwt.exceptions.JWTDecodeException;
import com.auth0.jwt.exceptions.TokenExpiredException;
import com.auth0.jwt.interfaces.DecodedJWT;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import springfox.documentation.spring.web.json.Json;
import wang.aweb.childcar.common.Constant;
import wang.aweb.childcar.exception.CarException;
import wang.aweb.childcar.exception.CarExceptionEnum;
import wang.aweb.childcar.model.pojo.Member;
import wang.aweb.childcar.tcpserver.ActionHandler;
import wang.aweb.childcar.websocketserver.WebSocketServer;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger= LoggerFactory.getLogger(WebSocketServer.class);
    /**
     * 管理一个全局map,保存连接进服务端的通道数量
     * @param ctx
     */
    public static ConcurrentHashMap<ChannelId,ChannelHandlerContext> CHANNEL_MAP=new ConcurrentHashMap<>();
    /**
     * 用户ID=>channel
     */
    public static ConcurrentHashMap<Integer, String> channelMap = new ConcurrentHashMap<>();

    /**
     * 上线一个设备,绑定设备身份
     *
     * @param channelId
     * @param userId
     */
    public void bind(String channelId, Integer userId) {
        if(channelMap.containsKey(userId)){
            //logger.info("客户端【"+channelId+"】是连接状态,连接通道数量:"+CHANNEL_MAP.size());
        }else{
            //保存连接
            channelMap.put(userId,channelId);
        }
    }
    /**
     * 有用户进入
     * @param ctx
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx){
        System.out.println("有用户进入------->");
        InetSocketAddress insocket=(InetSocketAddress)ctx.channel().remoteAddress();
        String clientIp=insocket.getAddress().getHostAddress();
        int clientPort=insocket.getPort();
        //获取连接信道唯一标识
        ChannelId channelId=ctx.channel().id();
        System.out.println();
        if(CHANNEL_MAP.containsKey(channelId)){
            logger.info("客户端【"+channelId+"】是连接状态,连接通道数量:"+CHANNEL_MAP.size());
        }else{
            //保存连接
            CHANNEL_MAP.put(channelId,ctx);
            logger.info("客户端【"+channelId+"】连接netty服务器[ip:"+clientIp+"---->port:"+clientPort+"]");
            logger.info("连接通道数量"+CHANNEL_MAP.size());

        }
    }
    /**
     * 有用户退出
     * @param ctx
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx){
        InetSocketAddress insocket=(InetSocketAddress) ctx.channel().remoteAddress();

        String clientIp=insocket.getAddress().getHostAddress();
        ChannelId channelId=ctx.channel().id();
        //包含此客户端才去删除
        if(CHANNEL_MAP.containsKey(channelId)){
            CHANNEL_MAP.remove(channelId);
            System.out.println();
            logger.info("客户端【"+channelId+"】退出netty服务器[ip:"+clientIp+"---->port:"+insocket.getPort()+"]");
            logger.info("连接通道数量"+CHANNEL_MAP.size());
            Integer userId=(Integer) getKey(channelMap,channelId.toString());
            channelMap.remove(userId);
            //
        }
    }
    public static Object getKey(Map map, Object value){
        List<Object> keyList = new ArrayList<>();
        for(Object key: map.keySet()){
            if(map.get(key).equals(value)){
                keyList.add(key);
            }
        }
        return keyList;
    }
    /**
     * 有客户端发送消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
        System.out.println("接收消息......");
        logger.info("加载客户端报文......");
        ObjectMapper mapper=new ObjectMapper();

        Map<String,Object> tmpMap=mapper.readValue(((TextWebSocketFrame)msg).text(),Map.class);
        System.out.println(tmpMap);
        System.out.println(ctx.channel().remoteAddress()+"---->收到的信息:"+((TextWebSocketFrame)msg).text());
        if(tmpMap.containsKey("type") && tmpMap.containsKey("token") && tmpMap.containsKey("cmd")){
            String token=tmpMap.get("token").toString();
            Algorithm algorithm=Algorithm.HMAC256(Constant.JWT_TOKEN);
            JWTVerifier jwtVerifier= JWT.require(algorithm).build();
            Member currentMember=null;
            try{
                DecodedJWT jwt=jwtVerifier.verify(token);
                currentMember=new Member();
                currentMember.setId(jwt.getClaim(Constant.USER_ID).asInt());
            }catch (TokenExpiredException e){
                throw new CarException(CarExceptionEnum.TOKEN_EXPIRED);
            }catch (JWTDecodeException e){
                throw new CarException(CarExceptionEnum.TOKEN_WRONG);
            }catch (Exception e){
                throw new CarException(CarExceptionEnum.SYSTEM_ERROR);
            }

            this.channelWrite(ctx.channel().id(),msg);
        }
        else{
            return;
        }
    }
    /**
     * 服务端给客户端发送消息
     * @param channelId  联通信道唯一ID
     * @param msg   需要发送的内容
     * @return :void
     */
    public void channelWrite(ChannelId channelId,Object msg) throws Exception{
        ChannelHandlerContext ctx=CHANNEL_MAP.get(channelId);
        if(ctx==null){
            logger.info("通道【"+channelId+"】不存在");
            return;
        }
        if(msg==null || msg==""){
            logger.info("服务端响应空的消息");
            return;
        }
        ctx.write(msg);
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception{
        System.out.println(cause);
        ctx.close();
        logger.info(ctx.channel().id()+"发生了错误,此链接已关闭"+"此时连通数量:"+CHANNEL_MAP.size());
    }

}

4、入口修改

@Value("${netty.port}")
    private int nettyPort;
    @Value("${netty.url}")
    private String nettyUrl;
    @Value("${websocket.port}")
    private int webSocketPort;
    @Value("${websocket.url}")
    private String webSocketUrl;
    @Autowired
    private WebSocketServer webSocketServer;
    @Autowired
    private NettyServer server;
    public void run(String... args) throws Exception{
        InetSocketAddress address=new InetSocketAddress(nettyUrl,nettyPort);
        //System.out.println("run .... . ... "+nettyUrl);
        server.start(address);
        InetSocketAddress webSocketAddress=new InetSocketAddress(webSocketUrl,webSocketPort);
        //System.out.println("run .... . ..."+webSocketUrl);
        webSocketServer.start(webSocketAddress);

    }

到此,websocket就可以随http服务一起启动了。

评论

(= ̄ω ̄=)··· 暂无内容!

回复

您还未登录,请先登录或者注册