1、引入dependencie
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</depndency>
2、定义websocket服务类 WebSocketServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
@Component
public class WebSocketServer {
private static final Logger logger= LoggerFactory.getLogger(wang.aweb.childcar.websocketserver.WebSocketServer.class);
public void start(InetSocketAddress address){
EventLoopGroup gatewayGroup=new NioEventLoopGroup(1);
EventLoopGroup workerGroup=new NioEventLoopGroup();
try{
ServerBootstrap bootstrap=new ServerBootstrap()
.group(gatewayGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(address)
.childHandler(new WebSocketServerChannelInitializer())
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future=bootstrap.bind(address).sync();
logger.info("server start listen at "+address.getPort());
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
gatewayGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
3、初始化websocket服务 WebSocketServerChannelInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
public class WebSocketServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
channel.pipeline().addLast(new HttpServerCodec());
channel.pipeline().addLast(new HttpObjectAggregator(8192));
channel.pipeline().addLast(new WebSocketServerProtocolHandler("/","Websocket",true,65536*10));
channel.pipeline().addLast(new WebSocketServerHandler());
}
}
4、具体业务逻辑 WebSocketServerHandler
import com.auth0.jwt.JWT;
import com.auth0.jwt.JWTVerifier;
import com.auth0.jwt.algorithms.Algorithm;
import com.auth0.jwt.exceptions.JWTDecodeException;
import com.auth0.jwt.exceptions.TokenExpiredException;
import com.auth0.jwt.interfaces.DecodedJWT;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import springfox.documentation.spring.web.json.Json;
import wang.aweb.childcar.common.Constant;
import wang.aweb.childcar.exception.CarException;
import wang.aweb.childcar.exception.CarExceptionEnum;
import wang.aweb.childcar.model.pojo.Member;
import wang.aweb.childcar.tcpserver.ActionHandler;
import wang.aweb.childcar.websocketserver.WebSocketServer;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger= LoggerFactory.getLogger(WebSocketServer.class);
/**
* 管理一个全局map,保存连接进服务端的通道数量
* @param ctx
*/
public static ConcurrentHashMap<ChannelId,ChannelHandlerContext> CHANNEL_MAP=new ConcurrentHashMap<>();
/**
* 用户ID=>channel
*/
public static ConcurrentHashMap<Integer, String> channelMap = new ConcurrentHashMap<>();
/**
* 上线一个设备,绑定设备身份
*
* @param channelId
* @param userId
*/
public void bind(String channelId, Integer userId) {
if(channelMap.containsKey(userId)){
//logger.info("客户端【"+channelId+"】是连接状态,连接通道数量:"+CHANNEL_MAP.size());
}else{
//保存连接
channelMap.put(userId,channelId);
}
}
/**
* 有用户进入
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx){
System.out.println("有用户进入------->");
InetSocketAddress insocket=(InetSocketAddress)ctx.channel().remoteAddress();
String clientIp=insocket.getAddress().getHostAddress();
int clientPort=insocket.getPort();
//获取连接信道唯一标识
ChannelId channelId=ctx.channel().id();
System.out.println();
if(CHANNEL_MAP.containsKey(channelId)){
logger.info("客户端【"+channelId+"】是连接状态,连接通道数量:"+CHANNEL_MAP.size());
}else{
//保存连接
CHANNEL_MAP.put(channelId,ctx);
logger.info("客户端【"+channelId+"】连接netty服务器[ip:"+clientIp+"---->port:"+clientPort+"]");
logger.info("连接通道数量"+CHANNEL_MAP.size());
}
}
/**
* 有用户退出
* @param ctx
*/
@Override
public void channelInactive(ChannelHandlerContext ctx){
InetSocketAddress insocket=(InetSocketAddress) ctx.channel().remoteAddress();
String clientIp=insocket.getAddress().getHostAddress();
ChannelId channelId=ctx.channel().id();
//包含此客户端才去删除
if(CHANNEL_MAP.containsKey(channelId)){
CHANNEL_MAP.remove(channelId);
System.out.println();
logger.info("客户端【"+channelId+"】退出netty服务器[ip:"+clientIp+"---->port:"+insocket.getPort()+"]");
logger.info("连接通道数量"+CHANNEL_MAP.size());
Integer userId=(Integer) getKey(channelMap,channelId.toString());
channelMap.remove(userId);
//
}
}
public static Object getKey(Map map, Object value){
List<Object> keyList = new ArrayList<>();
for(Object key: map.keySet()){
if(map.get(key).equals(value)){
keyList.add(key);
}
}
return keyList;
}
/**
* 有客户端发送消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
System.out.println("接收消息......");
logger.info("加载客户端报文......");
ObjectMapper mapper=new ObjectMapper();
Map<String,Object> tmpMap=mapper.readValue(((TextWebSocketFrame)msg).text(),Map.class);
System.out.println(tmpMap);
System.out.println(ctx.channel().remoteAddress()+"---->收到的信息:"+((TextWebSocketFrame)msg).text());
if(tmpMap.containsKey("type") && tmpMap.containsKey("token") && tmpMap.containsKey("cmd")){
String token=tmpMap.get("token").toString();
Algorithm algorithm=Algorithm.HMAC256(Constant.JWT_TOKEN);
JWTVerifier jwtVerifier= JWT.require(algorithm).build();
Member currentMember=null;
try{
DecodedJWT jwt=jwtVerifier.verify(token);
currentMember=new Member();
currentMember.setId(jwt.getClaim(Constant.USER_ID).asInt());
}catch (TokenExpiredException e){
throw new CarException(CarExceptionEnum.TOKEN_EXPIRED);
}catch (JWTDecodeException e){
throw new CarException(CarExceptionEnum.TOKEN_WRONG);
}catch (Exception e){
throw new CarException(CarExceptionEnum.SYSTEM_ERROR);
}
this.channelWrite(ctx.channel().id(),msg);
}
else{
return;
}
}
/**
* 服务端给客户端发送消息
* @param channelId 联通信道唯一ID
* @param msg 需要发送的内容
* @return :void
*/
public void channelWrite(ChannelId channelId,Object msg) throws Exception{
ChannelHandlerContext ctx=CHANNEL_MAP.get(channelId);
if(ctx==null){
logger.info("通道【"+channelId+"】不存在");
return;
}
if(msg==null || msg==""){
logger.info("服务端响应空的消息");
return;
}
ctx.write(msg);
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception{
System.out.println(cause);
ctx.close();
logger.info(ctx.channel().id()+"发生了错误,此链接已关闭"+"此时连通数量:"+CHANNEL_MAP.size());
}
}
4、入口修改
@Value("${netty.port}")
private int nettyPort;
@Value("${netty.url}")
private String nettyUrl;
@Value("${websocket.port}")
private int webSocketPort;
@Value("${websocket.url}")
private String webSocketUrl;
@Autowired
private WebSocketServer webSocketServer;
@Autowired
private NettyServer server;
public void run(String... args) throws Exception{
InetSocketAddress address=new InetSocketAddress(nettyUrl,nettyPort);
//System.out.println("run .... . ... "+nettyUrl);
server.start(address);
InetSocketAddress webSocketAddress=new InetSocketAddress(webSocketUrl,webSocketPort);
//System.out.println("run .... . ..."+webSocketUrl);
webSocketServer.start(webSocketAddress);
}
到此,websocket就可以随http服务一起启动了。
评论