netty服務器
Netty配置
管道配置
自定義handler
推送消息接口及實現類
測試
學過 Netty 的都知道,Netty 對 NIO 進行了很好的封裝,簡單的 API,龐大的開源社區。深受廣大程序員喜愛。基于此本文分享一下基礎的 netty 使用。實戰制作一個 Netty + websocket 的消息推送小栗子。
netty服務器
@Component
publicclassNettyServer{
staticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class);
/**
*端口號
*/
@Value("${webSocket.netty.port:8888}")
intport;
EventLoopGroupbossGroup;
EventLoopGroupworkGroup;
@Autowired
ProjectInitializernettyInitializer;
@PostConstruct
publicvoidstart()throwsInterruptedException{
newThread(()->{
bossGroup=newNioEventLoopGroup();
workGroup=newNioEventLoopGroup();
ServerBootstrapbootstrap=newServerBootstrap();
//bossGroup輔助客戶端的tcp連接請求,workGroup負責與客戶端之前的讀寫操作
bootstrap.group(bossGroup,workGroup);
//設置NIO類型的channel
bootstrap.channel(NioServerSocketChannel.class);
//設置監聽端口
bootstrap.localAddress(newInetSocketAddress(port));
//設置管道
bootstrap.childHandler(nettyInitializer);
//配置完成,開始綁定server,通過調用sync同步方法阻塞直到綁定成功
ChannelFuturechannelFuture=null;
try{
channelFuture=bootstrap.bind().sync();
log.info("Serverstartedandlistenon:{}",channelFuture.channel().localAddress());
//對關閉通道進行監聽
channelFuture.channel().closeFuture().sync();
}catch(InterruptedExceptione){
e.printStackTrace();
}
}).start();
}
/**
*釋放資源
*/
@PreDestroy
publicvoiddestroy()throwsInterruptedException{
if(bossGroup!=null){
bossGroup.shutdownGracefully().sync();
}
if(workGroup!=null){
workGroup.shutdownGracefully().sync();
}
}
}
基于 Spring Boot + MyBatis Plus + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
項目地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
視頻教程:https://doc.iocoder.cn/video/
Netty配置
管理全局Channel以及用戶對應的channel(推送消息)
publicclassNettyConfig{
/**
*定義全局單利channel組管理所有channel
*/
privatestaticvolatileChannelGroupchannelGroup=null;
/**
*存放請求ID與channel的對應關系
*/
privatestaticvolatileConcurrentHashMapchannelMap=null;
/**
*定義兩把鎖
*/
privatestaticfinalObjectlock1=newObject();
privatestaticfinalObjectlock2=newObject();
publicstaticChannelGroupgetChannelGroup(){
if(null==channelGroup){
synchronized(lock1){
if(null==channelGroup){
channelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
}
}
returnchannelGroup;
}
publicstaticConcurrentHashMapgetChannelMap(){
if(null==channelMap){
synchronized(lock2){
if(null==channelMap){
channelMap=newConcurrentHashMap<>();
}
}
}
returnchannelMap;
}
publicstaticChannelgetChannel(StringuserId){
if(null==channelMap){
returngetChannelMap().get(userId);
}
returnchannelMap.get(userId);
}
}
基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 實現的后臺管理系統 + 用戶小程序,支持 RBAC 動態權限、多租戶、數據權限、工作流、三方登錄、支付、短信、商城等功能
項目地址:https://gitee.com/zhijiantianya/yudao-cloud
視頻教程:https://doc.iocoder.cn/video/
管道配置
@Component publicclassProjectInitializerextendsChannelInitializer{ /** *webSocket協議名 */ staticfinalStringWEBSOCKET_PROTOCOL="WebSocket"; /** *webSocket路徑 */ @Value("${webSocket.netty.path:/webSocket}") StringwebSocketPath; @Autowired WebSocketHandlerwebSocketHandler; @Override protectedvoidinitChannel(SocketChannelsocketChannel)throwsException{ //設置管道 ChannelPipelinepipeline=socketChannel.pipeline(); //流水線管理通道中的處理程序(Handler),用來處理業務 //webSocket協議本身是基于http協議的,所以這邊也要使用http編解碼器 pipeline.addLast(newHttpServerCodec()); pipeline.addLast(newObjectEncoder()); //以塊的方式來寫的處理器 pipeline.addLast(newChunkedWriteHandler()); pipeline.addLast(newHttpObjectAggregator(8192)); pipeline.addLast(newWebSocketServerProtocolHandler(webSocketPath,WEBSOCKET_PROTOCOL,true,65536*10)); //自定義的handler,處理業務邏輯 pipeline.addLast(webSocketHandler); } }
自定義handler
@Component @ChannelHandler.Sharable publicclassWebSocketHandlerextendsSimpleChannelInboundHandler{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(NettyServer.class); /** *一旦連接,第一個被執行 */ @Override publicvoidhandlerAdded(ChannelHandlerContextctx)throwsException{ log.info("有新的客戶端鏈接:[{}]",ctx.channel().id().asLongText()); //添加到channelGroup通道組 NettyConfig.getChannelGroup().add(ctx.channel()); } /** *讀取數據 */ @Override protectedvoidchannelRead0(ChannelHandlerContextctx,TextWebSocketFramemsg)throwsException{ log.info("服務器收到消息:{}",msg.text()); //獲取用戶ID,關聯channel JSONObjectjsonObject=JSONUtil.parseObj(msg.text()); Stringuid=jsonObject.getStr("uid"); NettyConfig.getChannelMap().put(uid,ctx.channel()); //將用戶ID作為自定義屬性加入到channel中,方便隨時channel中獲取用戶ID AttributeKey key=AttributeKey.valueOf("userId"); ctx.channel().attr(key).setIfAbsent(uid); //回復消息 ctx.channel().writeAndFlush(newTextWebSocketFrame("服務器收到消息啦")); } @Override publicvoidhandlerRemoved(ChannelHandlerContextctx)throwsException{ log.info("用戶下線了:{}",ctx.channel().id().asLongText()); //刪除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); } @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ log.info("異常:{}",cause.getMessage()); //刪除通道 NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); ctx.close(); } /** *刪除用戶與channel的對應關系 */ privatevoidremoveUserId(ChannelHandlerContextctx){ AttributeKey key=AttributeKey.valueOf("userId"); StringuserId=ctx.channel().attr(key).get(); NettyConfig.getChannelMap().remove(userId); } }
推送消息接口及實現類
publicinterfacePushMsgService{
/**
*推送給指定用戶
*/
voidpushMsgToOne(StringuserId,Stringmsg);
/**
*推送給所有用戶
*/
voidpushMsgToAll(Stringmsg);
}
@Service
publicclassPushMsgServiceImplimplementsPushMsgService{
@Override
publicvoidpushMsgToOne(StringuserId,Stringmsg){
Channelchannel=NettyConfig.getChannel(userId);
if(Objects.isNull(channel)){
thrownewRuntimeException("未連接socket服務器");
}
channel.writeAndFlush(newTextWebSocketFrame(msg));
}
@Override
publicvoidpushMsgToAll(Stringmsg){
NettyConfig.getChannelGroup().writeAndFlush(newTextWebSocketFrame(msg));
}
}
測試

鏈接服務器


發送消息


調用接口,往前端推送消息!


OK!
一個簡單的 netty 小栗子就完成了。
-
接口
+關注
關注
33文章
9519瀏覽量
157020 -
封裝
+關注
關注
128文章
9248瀏覽量
148614 -
服務器
+關注
關注
14文章
10251瀏覽量
91480
原文標題:Spring Boot+Netty+Websocket實現后臺向前端推送信息
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
基于多路復用模型的Netty框架
基于阿里云移動推送的移動應用推送模式最佳實踐
如何實現服務器自動推送消息?
怎么去理解netty
怎樣使用springboot整合netty來開發一套高性能的通信系統呢
Springboot整合netty框架實現終端、通訊板子(單片機)TCP/UDP通信案例
Netty如何實現消息推送
Netty如何做到單機百萬并發?
jdk17下netty導致堆內存瘋漲原因排查
netty推送消息接口及實現
評論