1. 程式人生 > 其它 >Netty-使用netty實現群聊-程式碼實現

Netty-使用netty實現群聊-程式碼實現

技術標籤:Nettynetty

1、功能介紹

和之前的NIO實現群聊機制一樣,這裡使用Netty實現:

  • 1、當有新使用者上線時,通知其他使用者上線,下線同理
  • 2、使用者x傳送一條訊息,伺服器端轉發給其他所有線上伺服器

2、程式碼結構

在這裡插入圖片描述

3、GroupChatServer

package groupchat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.
socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import java.beans.Encoder; public class GroupChatServer { //監聽埠 private int port; public GroupChatServer(int port){ this.port = port; } //編寫run方法,處理客戶端的請求
public void run() throws Exception{ //建立兩個執行緒組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //其中NioEventLoppGroup預設八個執行緒 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //建立ServerBootstrap,伺服器啟動引導類 ServerBootstrap b =
new ServerBootstrap(); //設定ServerBootstrap相關引數 b.group(bossGroup, workerGroup)//roup函式設定兩個EventLoop .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() {//指定相關handler @Override protected void initChannel(SocketChannel ch) throws Exception { //獲取pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline中加入解碼器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline中加入編碼器 pipeline.addLast("encoder", new StringDecoder()); //加入自己的業務處理handler,需要自己寫!關鍵部分 pipeline.addLast(new GroupChatServerHandler()); } }); System.out.println("Netty伺服器已啟動"); ChannelFuture channelFuture = b.bind(port).sync(); //監聽關閉事件 channelFuture.channel().closeFuture().sync(); }finally { //關閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } //主函式,也需要丟擲異常 public static void main(String[] args) throws Exception{ //啟動一個伺服器 new GroupChatServer(7000); } }

4、GroupChatServerHandler

package groupchat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.SimpleTimeZone;

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
    //定義一個channelle組,管理所有的channel
    //GlobalEventExecutor.INSTANCE是全域性的時間執行器,是一個單例,幫助執行channelGroup
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    //輸出接收到訊息的時間用
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //當連線建立之後,第一個被執行,將當前的channel加入到channelGroup
    @Override
    public  void handlerAdded(ChannelHandlerContext ctx) throws Exception{
        Channel channel = ctx.channel();
        //將該客戶端加入聊天的訊息  傳送給其他的線上客戶端
        //writeAndFlush方法會自動遍歷所有的channel,併發送括號內訊息
        //最後加入當前訊息傳送的時間
        channelGroup.writeAndFlush("客戶端" + channel.remoteAddress() + "加入聊天\n"
                + sdf.format(new java.util.Date()) + "\n");
        //channelGroup中加入該channel
        channelGroup.add(channel);
    }

    //斷開連線,觸發該通知給當前線上的客戶
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("客戶端" + channel.remoteAddress() + "離開聊天了\n");
        System.out.println("測試用:"+"channelGroup_size=" + channelGroup.size() );
    }

    //channel出於活動狀態,提示xxx已上線
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception{
        System.out.println(ctx.channel().remoteAddress() + "上線");
    }

    //提示xxx離線
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception{
        System.out.println(ctx.channel().remoteAddress() + "離線了");
    }

    //讀取資料並轉發資料給當前線上的其他客戶端
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        //獲取當前channel
        //給自己傳送的訊息是不一樣的
        final Channel channel = channelHandlerContext.channel();
        //根據不同的情況,需要會送不同的訊息
        channelGroup.forEach(ch -> {
            if(channel != ch)
                ch.writeAndFlush("[客戶]" + channel.remoteAddress() + "傳送了訊息:" + s + "\n");
            else
                ch.writeAndFlush("[自己]" + "傳送了訊息:" + s + "\n");
        });
    }

    //發生異常時
    @Override
    public void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception{
        //關閉通道
        var1.close();
    }

}

5、GroupChatClient

package groupchat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import javax.naming.event.EventContext;
import java.awt.*;
import java.util.Scanner;

public class GroupChatClient {
    //主機地址
    private final String host;
    //埠
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected  void initChannel(SocketChannel ch) throws Exception{
                            //得到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //加入相關handler
                            pipeline.addLast("decoder",new StringDecoder());
                            pipeline.addLast("encoder",new StringEncoder());
                            //加入自定義handler
                            pipeline.addLast(new GroupChatServerHandler());
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect(host,port).sync();
            //得到當前channel
            Channel channel = channelFuture.channel();
            System.out.println("......" + channel.localAddress() + "......");

            //客戶端需要輸入資訊,使用Scanner掃描器
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNextLine()){
                String msg = scanner.nextLine();
                //將訊息通過channel傳送給伺服器端
                channel.writeAndFlush(msg + "\r\n");
            }
        }finally {
            eventExecutors.shutdownGracefully();
        }
    }

    //客戶端主函式
    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1",7000).run();
    }
}

6、GroupChatClientHandler

package groupchat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

7、測試方法

  • 1)新建一個Maven工程,匯入netty依賴
  • 2)按照2結構搭建程式碼
  • 3)執行伺服器端GroupChatServer
  • 4)執行一個客戶端GroupChatClient,再切換埠執行新的客戶端,進行測試,收發訊息/通知與否

8、遇到的問題

遇到Exception異常Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:53853
應該是IP或者埠設定錯了,嘗試設定一下本地計算機