1. 程式人生 > 其它 >使用Netty自定義實現Dubbo

使用Netty自定義實現Dubbo

使用Netty自定義實現Dubbo

設計目標:

  使用 Netty 實現一個簡單的 RPC 框架。

設計需求:

  模仿 Dubbo,消費者和提供者共同約定介面和協議,消費者遠端呼叫提供者的服務,提供者返回一個字串,消費者列印提供者返回的資料。

設計說明:

  網路通訊使用 Netty 4.1.20。

設計示圖:

程式碼示例:

公共介面 HelloService

/**
 * 客戶端與伺服器端公共介面
 *
 * @author LJT
 * @date 2021/11/23 13:42
 */
public interface HelloService {

    String sayHello(String mes);

}
HelloServiceImpl

/**
 * 伺服器端實現公共介面
 *
 * @author LJT
 * @date 2021/11/23 13:43
 */
public class HelloServiceImpl implements HelloService {

    private static int count = 0;

    // 服務端實現公共介面,重寫裡面的方法
    // 當收到客戶端的訊息時,返回響應的結果
    public String sayHello(String mes) {

        System.out.println("收到了客戶端的訊息=" + mes);

        
// 根據請求不同的mes,返回不同的響應結果 if (mes != null) { return "你好呀客戶端,我已經收到你的訊息【" + mes + "】第" + (++count) + "次"; } else { return "你好呀客戶端,我已經收到你的訊息了"; } } }
ServerBootstrap

/**
 * ServerBootstrap 會啟動一個服務提供者,即 NettyServer
 *
 * @author LJT
 * @date 2021/11/23 13:51
 
*/ public class ServerBootstrap { public static void main(String[] args) { // 啟動服務,繫結本機的地址與埠 NettyServer.startServer("127.0.0.1", 7000); } }
NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 完成NettyServer的初始化和啟動
 *
 * @author LJT
 * @date 2021/11/23 13:54
 */
public class NettyServer {

    // 接收外來的呼叫傳參
    public static void startServer(String hostName, int port) {
        startServer0(hostName, port);
    }

    // 初始化NettyServer並呼叫啟動服務
    private static void startServer0(String hostName, int port) {

        // 服務端的兩個group,一個用來接收請求,一個用來處理邏輯
        // 不寫引數,預設是 cpu核數 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            
            // 配置啟動引數
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler()); // 自定義業務處理器
                        }
                    });

            // 回撥處理
            ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync();
            System.out.println("伺服器啟動成功,開始提供服務~~~");
            channelFuture.channel().closeFuture().sync();
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 最後關閉,防止浪費資源,要養成習慣
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
NettyServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 自定義伺服器處理器
 *
 * @author LJT
 * @date 2021/11/23 14:14
 */
// 這裡繼承 ChannelInboundHandlerAdapter 介面卡
// 不需要再去管其它事情,只需要把精力集中重寫需要的方法,處理業務邏輯
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        // 獲取客戶端傳送的訊息,列印訊息
        System.out.println("msg=" + msg);

        // 客戶端在呼叫伺服器的 api 時,我們需要定義一個協議
        // 比如,每次發訊息都必須是以某個字串開頭 “HelloService#hello#你好”
        // 在實際的開發中,類似二次解碼,提取出有用的資訊
        if (msg.toString().startsWith(ClientBootstrap.providerName)) {

            String result = new HelloServiceImpl().sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);

        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
ClientBootstrap

/**
 * 客戶端啟動引導
 *
 * @author LJT
 * @date 2021/11/23 15:16
 */
public class ClientBootstrap {

    // 這裡定義協議頭
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws Exception {

        // 建立一個消費者
        NettyClient customer = new NettyClient();

        // 建立代理物件,在實際的開發中,使用 spring 進行管理
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (; ; ) {
            Thread.sleep(2 * 1000);
            // 通過代理物件呼叫服務提供者的方法(服務)
            String res = service.sayHello("你好 dubbo~");
            System.out.println("呼叫的結果 res= " + res);
        }
    }
}
NettyClient

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * NettyClient的建立與初始化
 *
 * @author LJT
 * @date 2021/11/23 14:40
 */
public class NettyClient {

    // 建立執行緒池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    // 編寫方法使用代理模式,獲取一個代理物件
    // 在開發中,使用 spring 獲取,不需要太關注於底層細節
    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 進入...." + (++count) + " 次");
                    //{}  部分的程式碼,客戶端每呼叫一次 hello, 就會進入到該程式碼
                    if (client == null) {
                        initClient();
                    }

                    //設定要發給伺服器端的資訊
                    //providerName 協議頭 args[0] 就是客戶端呼叫api hello(???), 引數
                    client.setPara(providerName + args[0]);

                    return executor.submit(client).get();

                });
    }

    //初始化客戶端
    private static void initClient() {

        client = new NettyClientHandler();

        // 建立EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();

        // 配置初始化啟動引數
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            // 連線請求的伺服器地址與埠
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }  
      // 注意這裡不需要關閉 group
} }
NettyClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

/**
 * 自定義客戶端處理器
 *
 * @author LJT
 * @date 2021/11/23 14:24
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //返回的結果
    private String para; //客戶端呼叫方法時,傳入的引數

    // 與伺服器的連線建立後,就會被呼叫, 這個方法是第一個被呼叫
    // 確保它是活的
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被呼叫  ");
        context = ctx; //因為我們在其它方法會使用到 ctx
    }

    //收到伺服器的資料後,邏輯處理
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被呼叫  ");
        result = msg.toString();
        notify(); //喚醒等待的執行緒
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    // 被代理物件呼叫, 傳送資料給伺服器,-> wait -> 等待被喚醒(channelRead)
    // 注意這裡需要加上 synchronized 不然會報異常
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被呼叫  ");
        context.writeAndFlush(para);
        //進行wait
        wait(); //等待channelRead 方法獲取到伺服器的結果後,喚醒
        System.out.println(" call2 被呼叫  ");
        return result; //服務方返回的結果

    }

    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}

執行結果: