Netty實現管理端主動向客戶端推送資料
阿新 • • 發佈:2021-02-05
1.需求分析:
跑一個定時任務每十秒從資料庫中查詢一次資料,存到redis中,然後通過netty將資料推送到每一個連線的客戶端
2.具體實現
2.1定義一個NettyServer用來配置netty中channel的一些配置資訊,及對資料任務的處理
@Component
public class NettyServer {
private static Logger logger = LoggerFactory.getLogger(NettyServer.class);
@Autowired
private ServerHandler serverHandler;
public static ServerSocketChannel serverSocketChannel;
public void start(int port) throws Exception {
// 連線處理group
EventLoopGroup boss = new NioEventLoopGroup();
// 事件處理group
EventLoopGroup worker = new NioEventLoopGroup();
//1.建立ServerBootStrap例項
ServerBootstrap bootstrap = new ServerBootstrap();
// 繫結處理group
//2.設定並繫結Reactor執行緒池:EventLoopGroup,EventLoop就是處理所有註冊到本執行緒的Selector上面的Channel
bootstrap.group(boss, worker)
//3.設定並繫結服務端的channel
.channel(NioServerSocketChannel.class)
// 保持連線數
.option (ChannelOption.SO_BACKLOG, 1024)
// 有資料立即傳送
.option(ChannelOption.TCP_NODELAY, true)
// 保持連線
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 處理新連線
//設定了客戶端連線socket屬性。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 增加任務處理
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new HttpServerCodec());
//支援寫大資料流
pipeline.addLast(new ChunkedWriteHandler());
//http聚合器
pipeline.addLast(new HttpObjectAggregator(1024*62));
//websocket支援,設定路由
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast( // 自定義的處理器
serverHandler);
}
});
// 繫結埠,同步等待成功
ChannelFuture future;
try {
logger.info("netty伺服器在[{}]埠啟動監聽",port);
//真正讓netty跑起來的重點
future = bootstrap.bind(port).sync();
if (future.isSuccess()) {
serverSocketChannel = (ServerSocketChannel) future.channel();
logger.info("netty服務開啟成功");
} else {
logger.info("netty服務開啟失敗");
}
// 等待服務監聽埠關閉,就是由於這裡會將執行緒阻塞,導致無法傳送資訊,所以我這裡開了執行緒
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 優雅地退出,釋放執行緒池資源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
2.2定義一個ServerHandler,繼承SimpleChannelInboundHandler,用來做與使用者端連線,及連線後對channel的資料處理,
注:@ChannelHandler.Sharable 因為一個ChannelHandler可以從屬於多個ChannelPipeline,所以它也可以繫結到多個ChannelHandlerContext例項。用於這種用法的ChannelHandler必須要使用@Sharable註解標註;否則,試圖將它新增到多個ChannelPipeline時將會觸發異常。為了安全地被用於多個併發的Channel(即連線),這樣的ChannelHandler必須是執行緒安全的。
@Component
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static Logger logger = LoggerFactory.getLogger(ServerHandler.class);
/**
* 在與客戶端的連線已經建立之後將被呼叫
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//獲取每個使用者端連線的ip
InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
logger.info("netty客戶端與服務端連線開始...");
ChannelMapServer.addChannel(clientIp,ctx.channel());
ctx.writeAndFlush("連線成功");
}
// /**
// * 當從客戶端接收到一個訊息時被呼叫
// * msg 就是硬體傳送過來的資料資訊
// */
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
// }
/**
* 客戶端與服務端斷開連線時呼叫
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("netty客戶端與服務端連線關閉...");
InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress();
String clientIp = ipSocket.getAddress().getHostAddress();
ChannelMapServer.removeChannelByName(clientIp);
}
// /**
// * 服務端接收客戶端傳送過來的資料結束之後呼叫
// */
// @Override
// public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//
// }
/**
* 在處理過程中引發異常時被呼叫
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
System.out.println("異常資訊:rn " + cause.getMessage());
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
System.out.println("伺服器端收到訊息" + textWebSocketFrame.text());
channelHandlerContext.channel().writeAndFlush(new TextWebSocketFrame("receive client msg:"+ textWebSocketFrame.text()));
}
}
2.3ChannelMapServer,定義一個map結合來儲存每一個連線進來的客戶端,然後向每一個連線進來的客戶端定時推送訊息
public class ChannelMapServer {
public static ConcurrentHashMap<String, Channel> channelHashMap=null;
/**
* 獲取ConcurrentHashMap
*/
public static ConcurrentHashMap<String, Channel> getChannelHashMap() {
return channelHashMap;
}
/**
* 獲取指定name的channel
*/
public static Channel getChannelByName(String name){
if(channelHashMap==null||channelHashMap.isEmpty()){
return null;
}
return channelHashMap.get(name);
}
/**
* 將通道中的訊息推送到每一個客戶端
*/
public static boolean pushNewsToAllClient(String obj){
if(channelHashMap==null||channelHashMap.isEmpty()){
return false;
}
for(String name: channelHashMap.keySet()) {
Channel channel = channelHashMap.get(name);
channel.writeAndFlush(new TextWebSocketFrame(obj));
}
return true;
}
/**
* 將channel和對應的name新增到ConcurrentHashMap
*/
public static void addChannel(String name,Channel channel){
if(channelHashMap==null){
channelHashMap=new ConcurrentHashMap<>(128);
}
channelHashMap.put(name,channel);
}
/**
* 移除掉name對應的channel
*/
public static String removeChannelByName(String name){
if(channelHashMap.containsKey(name)){
channelHashMap.remove(name);
return Constants.CHANNEL_NAME_SUCCESS;
}
return Constants.CHANNEL_NAME_FAILURE;
}
}
2.4在Application中配置專案啟動時即執行
@SpringBootApplication
@EnableTransactionManagement
@EnableScheduling
@MapperScan("com.XXX.YYYYYY.mapper")
@ComponentScan(basePackages = {"com.XXX"})
public class Application implements CommandLineRunner {
@Autowired
private NettyServer nettyServer;
public static void main(String[] args) {
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
nettyServer.start(8082);
}
}
3.測試,使用postjson進行線上測試
可以看到netty啟動成功
這是執行定時任務,就能在postjson中打印出定時任務執行的資料