1. 程式人生 > 實用技巧 >netty之 -- 手寫rpc框架

netty之 -- 手寫rpc框架

接下來手寫一個簡陋的rpc框架,首先分析一下呼叫流程

話不多說,直接上程式碼:

一個公共介面,相當於protobuf協議中的proto檔案

package com.yang.java.main.netty.rpc.publicInterface;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/16
 */
public interface PublicInterface {

    String hello(String msg);
}

服務端實現:

實現宣告的介面

package com.yang.java.main.netty.rpc.provider;

import com.yang.java.main.netty.rpc.publicInterface.PublicInterface; /** * Description: * * @author mark * Date 2020/9/16 */ public class PublicInterfaceImpl implements PublicInterface { @Override public String hello(String msg) { System.out.println("receive from customer: " + msg);
return "provider receive the message: " + msg; } }

netty的服務端

package com.yang.java.main.netty.rpc.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * Description: * * @author mark * Date 2020/9/16 */ public class NettyServer { // 學習一下netty的命名方法 public static void startServer(String host, int port){ startServer0(host, port); } private static void startServer0(String host, int port){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); //業務處理器 } } ); ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync(); System.out.println("provider is start ~~"); channelFuture.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.yang.java.main.netty.rpc.netty;

import com.yang.java.main.netty.rpc.provider.PublicInterfaceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/16
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //獲取客戶端傳送的訊息,並呼叫服務
        System.out.println("msg: " + msg);
        String result = new PublicInterfaceImpl().hello(msg.toString());
        ctx.writeAndFlush(result);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
package com.yang.java.main.netty.rpc.provider;

import com.yang.java.main.netty.rpc.netty.NettyServer;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/16
 */
public class ProviderBootstrap {

    public static void main(String[] args) {
        NettyServer.startServer("127.0.0.1", 7000);
    }
}

netty的客戶端

package com.yang.java.main.netty.rpc.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/16
 */
public class NettyClient {

    //建立執行緒池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    public Object getBean(final Class<?> providerClass) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{providerClass}, (proxy, method, args) -> {
                    System.out.println("(proxy, method, args) come " + (++count) + "time");
                    if (client == null) {
                        initClient();
                    }
                    client.setParas((String) args[0]);
                    return executor.submit(client).get();  // 這個其實就是呼叫call方法
                });
    }

    // 初始化客戶端
    private static void initClient() {
        client = new NettyClientHandler();
        //建立EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
package com.yang.java.main.netty.rpc.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

/**
 * Description:
 * 需要繼承CallAble
 *
 * @author mark
 * Date 2020/9/16
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    //上下文,因此需要正在call方法中使用,因此快取起來
    private ChannelHandlerContext context;

    //返回的結果
    private String result;

    //客戶端呼叫方法時,傳入的引數
    private String paras;

    // 必須加同步鎖,通過wait等到channelRead
    @Override
    public synchronized Object call() throws Exception {
        System.out.println("call before wait");
        context.writeAndFlush(paras);
        wait();
        System.out.println("call after wait");
        return result;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
        context = ctx;
    }

    // 必須加同步鎖,完成之後通過notify告知call已收到訊息
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead");
        result = msg.toString();
        notify();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    public void setParas(String paras) {
        System.out.println("set paras");
        this.paras = paras;
    }
}
package com.yang.java.main.netty.rpc.consumer;

import com.yang.java.main.netty.rpc.netty.NettyClient;
import com.yang.java.main.netty.rpc.publicInterface.PublicInterface;

/**
 * Description:
 *
 * @author mark
 * Date 2020/9/16
 */
public class ClientBootstrap {

    public static void main(String[] args){

        //建立一個消費者
        NettyClient customer = new NettyClient();

        //建立代理物件
        PublicInterface provider = (PublicInterface) customer.getBean(PublicInterface.class);

        for (;; ) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //通過代理物件呼叫服務提供者的方法(服務)
            String res = provider.hello("hello rpc~");
            System.out.println("result: " + res);
        }
    }
}

結果分析,符合預期

proxy, method, args) come 1time
set paras
channelActive
call before wait
channelRead
call after wait
result: provider receive the message: hello rpc~
(proxy, method, args) come 2time
set paras
call before wait
channelRead
call after wait
result: provider receive the message: hello rpc~

簡要步驟如下:

  1. 服務消費方(customer)以本地呼叫方式呼叫服務
  2. customer stub(grpc一般這樣簡寫) 接收到呼叫後負責將方法、引數等封裝成能夠進行網路傳輸的訊息體
  3. customer stub 將訊息進行編碼併發送到服務端
  4. provider stub 收到訊息後進行解碼
  5. providerstub 根據解碼結果呼叫本地的服務
  6. 本地服務執行並將結果返回給 providerstub
  7. providerstub 將返回匯入結果進行編碼併發送至消費方
  8. client stub 接收到訊息並進行解碼
  9. 服務消費方(customer)得到結果

原始碼地址