1. 程式人生 > 其它 >手寫RPC框架實現

手寫RPC框架實現

手寫RPC框架實現

參考視訊:自己動手實現RPC框架

1、什麼是RPC?

  • RPC, 英文全名remote procedure call 即遠端過程掉呼叫
  • 就是說一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的方法
  • 由於不在一個記憶體空間,不能直接呼叫,需要通過網路來表達呼叫的語義和傳達呼叫的資料
  • RPC就是要像呼叫本地的函式一樣去呼叫遠端函式

2、RPC架構

服務提供者(RPC Server): 執行在伺服器端,提供服務介面定義與服務實現類。

註冊中心(Registry): 執行在伺服器端,負責將本地服務釋出成遠端服務,管理遠端服務,提供給服務消費者使用。

服務消費者(RPC Client): 執行在客戶端,通過遠端代理物件呼叫遠端服務。

3、動手實現RPC框架

​ 實現RPC框架主要分成6個模組實現,分別是序列化模組、server模組、client模組、網路傳輸模組、協議模組、通用模組。

3.1 通用模組實現

通用模組主要提供各個模組公用的一個反射工具類,用來根據Class物件建立類的例項物件、獲得類的公共方法以及呼叫指定物件的指定方法。

ReflectionUtils.java

import java.lang.invoke.MethodHandle;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 反射工具類,用於建立例項和呼叫方法實現。
 * @date 2022/5/15 15:52
 */
public class ReflectionUtils {
    /**
     * 根據clazz建立物件
     * @param clazz 帶建立物件的類
     * @param <T> 物件型別
     * @return 建立好物件
     */
    public static <T> T newInstance(Class<T> clazz){
        try{
            return clazz.newInstance();
        }catch (Exception e){
            throw new IllegalStateException();
        }

    }

    /**
     * 獲取某個類clazz的公有方法
     * @param clazz
     * @return 當前類宣告的公有方法
     */
    public static Method[] getPublicMethods(Class clazz){
        Method[] methods = clazz.getDeclaredMethods();
        List<Method> pmethods = new ArrayList<>();
        for(Method m : methods){
            if(Modifier.isPublic(m.getModifiers())){
                pmethods.add(m);
            }
        }
        return pmethods.toArray(new Method[0]);
    }

    /**
     *
     * @param obj 被呼叫方法的物件
     * @param method 被呼叫的方法
     * @param args 方法的引數
     * @return 返回結果

     */
    public static Object invoke(Object obj, Method method, Object... args) {
        try{
            return method.invoke(obj, args);
        }catch (Exception e){
            throw new IllegalStateException();
        }

    }
}
3.2 序列化模組實現

序列化模組主要就是當客戶端呼叫服務端的介面方法時,需要傳入呼叫的具體方法和引數,因此需要通過底層的網路協議(如HTTP、TCP),傳輸到服務端,所以需要將資料序列化後才能在網路中進行傳輸,服務端接收後需要對傳輸過來的資料進行反序列化拿到具體的值。序列化的方式有很多,如java原生序列化、json序列化、Protobuff序列化等。

JSONDecoder.java

import com.alibaba.fastjson.JSON;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description fastjson實現反序列化
 * @date 2022/5/15 16:11
 */
public class JSONDecoder implements Decoder{
    @Override
    public <T> T decode(byte[] bytes, Class<T> clazz) {
        return JSON.parseObject(bytes, clazz);
    }
}

JSONEecoder.java

import com.alibaba.fastjson.JSON;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description fastjson實現序列化
 * @date 2022/5/15 16:11
 */
public class JSONEncoder implements Encoder{
    @Override
    public byte[] encode(Object obj) {
        return JSON.toJSONBytes(obj);
    }
}
3.3 協議模組

協議模組主要就是規定了客戶端傳輸的資料內容服務端響應的內容網路端點的資訊客戶端傳輸的資料內容主要包括請求的資訊(一個實體類,其中包括服務名、呼叫的具體函式名、引數型別、返回型別、版本號)、呼叫引數值。服務端響應的內容主要包括相應編碼(成功或失敗)、服務端資訊以及呼叫的服務端函式返回的資料。網路端點的資訊就包括主機名和埠號。

Peer.java

import lombok.AllArgsConstructor;
import lombok.Data;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 表示網路傳輸的一個端點
 * @date 2022/5/15 15:09
 */
@Data
@AllArgsConstructor
public class Peer {
    private String host;
    private int port;
}

Request.java

import lombok.Data;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 請求服務
 * @date 2022/5/15 15:15
 */
@Data
public class Request {
    private ServiceDescriptor serviceDescriptor; // 請求的資訊,其中包括服務名(介面名)、呼叫的具體函式名、引數型別、返回型別、版本號
    private Object[] parameters;
}

Response.java

import lombok.Data;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 表示RPC的返回
 * @date 2022/5/15 15:15
 */
@Data
public class Response {
    /**
     *  服務返回編碼,0 成功,非0 失敗
     */
    private int code = 0;
    /**
     * 具體的錯誤資訊
     */
    private String message = "ok";
    /**
     *  返回的資料
     */
    private Object data;

}

ServiceDescriptor.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.lang.reflect.Method;
import java.util.Arrays;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 服務的描述,在服務端註冊服務和具體實現類的時候需要用到map,所以需要重寫hashcode 和 equals。 一個服務介面可以有多個不同版本的實現類。所以加入了version欄位。
 * @date 2022/5/15 15:15
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceDescriptor {
    public String clazz; // 服務介面
    private String method; // 方法名
    private String returnType; // 返回型別
    private String[] parameterTypes;// 引數型別
    private String version;// 服務版本號
    // 不帶版本號的介面描述
    public static ServiceDescriptor from(Class clazz, Method method){
        ServiceDescriptor sdp = new ServiceDescriptor();
        sdp.setClazz(clazz.getName());
        sdp.setMethod(method.getName());
        sdp.setReturnType(method.getReturnType().getName());
        Class[] parameterClasses = method.getParameterTypes();
        String[] parameterTypes = new String[parameterClasses.length];
        for (int i = 0; i < parameterClasses.length; i++) {
            parameterTypes[i] = parameterClasses[i].getName();
        }
        sdp.setParameterTypes(parameterTypes);
        return sdp;
    }
    // 帶版本號的介面描述
    public static ServiceDescriptor from(Class clazz,String version, Method method){
        ServiceDescriptor sdp = new ServiceDescriptor();
        sdp.setClazz(clazz.getName());
        sdp.setMethod(method.getName());
        sdp.setReturnType(method.getReturnType().getName());
        sdp.setVersion(version);
        Class[] parameterClasses = method.getParameterTypes();
        String[] parameterTypes = new String[parameterClasses.length];
        for (int i = 0; i < parameterClasses.length; i++) {
            parameterTypes[i] = parameterClasses[i].getName();
        }
        sdp.setParameterTypes(parameterTypes);
        return sdp;
    }
    @Override
    public boolean equals(Object o){
        if (this == o) return true;
        if(o == null || this.getClass() != o.getClass()) return  false;
        ServiceDescriptor that = (ServiceDescriptor) o;
        return this.toString().equals(that.toString());
    }
    @Override
    public int hashCode(){
        return toString().hashCode();
    }
    public String toString(){
        return "clazz=" + clazz + ", method" +method + ",returnType = " + returnType + ",parametersType=" + Arrays.toString(parameterTypes);
    }

}
3.4 網路傳輸模組

網路傳輸模組主要就是指定主機之間採用什麼網路協議進行通訊,這裡採用的是HTTP傳輸協議,客戶端向服務端發起連線請求,並傳輸資料,服務端利用jetty內嵌伺服器接收請求,並對相應的請求處理,呼叫具體的服務,將服務執行結果返回給消費端。

HTTPTransportClient.java

import com.rpc.Peer;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 基於HTTP實現的客戶端
 * @date 2022/5/15 19:38
 */
public class HTTPTransportClient implements TransportClient{
    private String url;

    @Override
    public void connect(Peer peer) {
        this.url = "http://" + peer.getHost() + ":" + peer.getPort();
    }

    @Override
    public InputStream write(InputStream data) throws IOException {
        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
        connection.setDoOutput(true);
        connection.setDoInput(true);
        connection.setUseCaches(false);
        connection.connect();
        IOUtils.copy(data, connection.getOutputStream());
        int resultCode = connection.getResponseCode();
        if(resultCode == HttpURLConnection.HTTP_OK){
            return connection.getInputStream();
        }else{
            return connection.getErrorStream();
        }
    }

    @Override
    public void close() {

    }
}

HTTPTransportServer.java

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import lombok.extern.slf4j.Slf4j;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 基於http短連線實現的伺服器,就是dubbo中的HTTPserver
 * @date 2022/5/15 19:18
 */
@Slf4j
public class HTTPTransportServer implements TransportServer{
    private RequestHandler handler; // 請求處理器
    private Server server;
    @Override
    public void init(int port, RequestHandler handler) {
        this.handler = handler;
        this.server = new Server(port);
        // servlet接收請求
        ServletContextHandler ctx = new ServletContextHandler();
        server.setHandler(ctx);

        ServletHolder holder = new ServletHolder(new RequestServlet());
        ctx.addServlet(holder, "/*"); // 對所有的http請求處理

    }

    @Override
    public void start() {
        try {
            server.start();
            server.join();
        }catch (Exception e){
            log.error(e.getMessage(),e);
        }

    }

    @Override
    public void stop() {
        try {
            server.stop();
        } catch (Exception e) {
            log.error(e.getMessage(),e);
        }
    }
    class RequestServlet extends HttpServlet{
        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
            log.info("客戶端連線");
            InputStream in = req.getInputStream();
            OutputStream out = resp.getOutputStream();
            if(handler != null){
                handler.onRequest(in, out);
            }
            out.flush();

        }
    }
}

RequestHandler.java

import java.io.InputStream;
import java.io.OutputStream;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 處理網路請求的handler
 * @date 2022/5/15 16:33
 */
public interface RequestHandler {
    void onRequest(InputStream recive, OutputStream toResp);
}
3.5 消費端模組

消費端模組採用動態代理的方式,生成服務的代理物件,這樣可以使得消費端直接呼叫具體的方法,而不用去關心底層的細節,在動態代理的Invoker裡面去實現真實的服務具體呼叫。同時還實現了負載均衡,簡單的利用隨機選擇主機來獲取連線。

RandomTransportSelector.java

import com.rpc.Peer;
import com.rpc.ReflectionUtils;
import com.rpc.transport.TransportClient;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 負載均衡選擇器, 隨機選擇一個連線進行傳輸
 * @date 2022/5/15 20:10
 */
@Slf4j
public class RandomTransportSelector implements TransportSelector{
    private List<TransportClient> clients;
    public RandomTransportSelector(){
        this.clients = new ArrayList<>();
    }
    @Override
    public synchronized void init(List<Peer> peers, int count, Class<? extends TransportClient> clazz) {
        count = Math.max(count, 1);
        for(Peer peer: peers){
            for(int i = 0;i < count;i++){
                TransportClient client = ReflectionUtils.newInstance(clazz);
                client.connect(peer);
                clients.add(client);
            }
            log.info("connect server:{}",peer);
        }
    }

    @Override
    public synchronized TransportClient select() {
        int i = new Random().nextInt(clients.size());
        return clients.remove(i);
    }

    @Override
    public void release(TransportClient client) {
        clients.add(client);
    }

    @Override
    public void close() {
        for(TransportClient client : clients){
            client.close();
        }
        clients.clear();
    }
}

RemoteInvoker.java

import com.rpc.*;
import com.rpc.transport.TransportClient;
import org.apache.commons.io.IOUtils;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 動態代理的InvocationHandler 的實現類。
 * @date 2022/5/15 20:32
 */
public class RemoteInvoker implements InvocationHandler {
    private Class clazz;
    private Encoder encoder;
    private Decoder decoder;
    private TransportSelector selector;
    public RemoteInvoker(Class clazz, Encoder encoder, Decoder decoder, TransportSelector selector){
        this.clazz = clazz;
        this.encoder = encoder;
        this.decoder = decoder;
        this.selector = selector;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Request request = new Request();
        request.setServiceDescriptor(ServiceDescriptor.from(clazz, method));
        request.setParameters(args);
        Response resp = invokeRemote(request);
        if(resp == null || resp.getCode() != 0){
            throw new IllegalStateException("fail to invoke remote:" +resp);
        }
        return resp.getData();
    }

    private Response invokeRemote(Request request) {
        Response response = null;
        TransportClient client = null;
        try{
            client = selector.select();
            byte[] encode = encoder.encode(request);
            InputStream write = client.write(new ByteArrayInputStream(encode));
            byte[] bytes = IOUtils.readFully(write, write.available());
            response = decoder.decode(bytes, Response.class);
        }catch (IOException e){
            response = new Response();
            response.setCode(1);
            response.setMessage("Rpc error"+e.getClass()+":"+e.getMessage());
        }finally {
            if(client!=null){
                selector.release(client);
            }
        }
        return response;
    }
}

RpcClient.java

import com.rpc.Decoder;
import com.rpc.Encoder;
import com.rpc.ReflectionUtils;

import java.lang.reflect.Proxy;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 動態代理, 生成服務的代理物件
 * @date 2022/5/15 20:20
 */
public class RpcClient {
    private RpcClientConfig config;
    private Encoder encoder;
    private Decoder decoder;
    private TransportSelector selector;
    public RpcClient(){
        this(new RpcClientConfig());
    }
    public RpcClient(RpcClientConfig config){
        this.config = config;
        this.encoder = ReflectionUtils.newInstance(this.config.getEncoderClass());
        this.decoder = ReflectionUtils.newInstance(this.config.getDecoderClass());
        this.selector = ReflectionUtils.newInstance(this.config.getTransportSelector());
        this.selector.init(this.config.getServers(), this.config.getConnectCount(), this.config.getTransportClass());
    }
    public <T> T getProxy(Class<T> clazz){
        return (T) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{clazz}, new RemoteInvoker(clazz,encoder,decoder,selector));
    }
}

RpcClientConfig.java

import com.rpc.*;
import com.rpc.transport.HTTPTransportClient;
import com.rpc.transport.TransportClient;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description
 * @date 2022/5/15 20:06
 */
@Data

public class RpcClientConfig {
    public Class<? extends TransportClient> transportClass = HTTPTransportClient.class;
    public Class<? extends Encoder> encoderClass = JSONEncoder.class;
    public Class<? extends Decoder> decoderClass = JSONDecoder.class;
    public Class<? extends TransportSelector> transportSelector = RandomTransportSelector.class;
    private int connectCount = 1;
    private List<Peer> servers = Arrays.asList(new Peer("127.0.0.1", 3000));
}
3.6 服務端模組

服務端模組將所有的服務註冊到到一個map裡面,鍵為服務的描述,ServiceDescriptor物件,值為實現類ServiceInstance物件,這樣當獲取到消費端傳來的ServiceDescriptor物件的時候可以找到對應的具體實現類。因為ServiceDescriptor類作為Map的鍵,所以需要重寫ServiceDescriptor類的hashcode() 和 equals()方法。當服務端找到具體的實現類後,利用動態代理反射的機制去呼叫具體的方法,並將結果返回給消費端。

RpcServer.java

import com.rpc.*;
import com.rpc.transport.RequestHandler;
import com.rpc.transport.TransportServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description
 * @date 2022/5/15 21:17
 */
@Slf4j
public class RpcServer {
    private RpcServiceConfig config;
    private TransportServer net;
    private Encoder encoder;
    private Decoder decoder;
    private ServiceManager serviceManager;
    private ServiceInvoker serviceInvoker;
    private RequestHandler handler = new RequestHandler() {
        Response resp = new Response();
        @Override
        public void onRequest(InputStream recive, OutputStream toResp) {// 這裡其實就是HttpServerHandler
            try{
                byte[] inBytes = IOUtils.readFully(recive, recive.available());
                Request request = decoder.decode(inBytes, Request.class);
                log.info("get request: {}",request);
                ServiceInstance sis = serviceManager.lookup(request);
                Object res = serviceInvoker.invoke(sis, request);
                resp.setData(res);
            }catch (IOException e){
                log.warn(e.getMessage(),e);
                resp.setCode(1);
                resp.setMessage("RpcServer get error:"+e.getClass().getName()+":"+e.getMessage());
            }finally {
                byte[] outBytes = encoder.encode(resp);
                try{
                    toResp.write(outBytes);
                    log.info("response client");
                }catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    };
    public RpcServer(RpcServiceConfig config){
        this.config = config;
        // net
        this.net = ReflectionUtils.newInstance(config.getTransportClass());
        this.net.init(config.getPort(), this.handler);
        // codec
        this.encoder = ReflectionUtils.newInstance(config.getEncoderClass());
        this.decoder = ReflectionUtils.newInstance(config.getDecoderClass());
        // Service
        this.serviceManager = new ServiceManager();
        this.serviceInvoker = new ServiceInvoker();
    }
    public void start(){
        this.net.start();
    }
    public void stop(){
        this.net.stop();
    }
    public <T> void register(Class<T> interfaceClass, T bean){
        serviceManager.register(interfaceClass, bean);
    }
    public RpcServer(){
        this(new RpcServiceConfig());
    }
}

RpcServiceConfig.java

import com.rpc.Decoder;
import com.rpc.Encoder;
import com.rpc.JSONDecoder;
import com.rpc.JSONEncoder;
import com.rpc.transport.HTTPTransportServer;
import com.rpc.transport.TransportServer;
import lombok.Data;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 服務配置類
 * @date 2022/5/15 21:13
 */
@Data
public class RpcServiceConfig {
    // 網路協議
    private Class<? extends TransportServer> transportClass = HTTPTransportServer.class;
    // 序列化
    private Class<? extends Encoder> encoderClass = JSONEncoder.class;
    // 反序列化
    private Class<? extends Decoder> decoderClass = JSONDecoder.class;

    private int port = 3000;

}

ServiceInstance.java

import lombok.AllArgsConstructor;
import lombok.Data;

import java.lang.reflect.Method;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 表示一個具體的服務
 * @date 2022/5/15 20:52
 */
@Data
@AllArgsConstructor
public class ServiceInstance {
    private Object target;
    private Method method;
}

ServiceInvoker.java

import com.rpc.ReflectionUtils;
import com.rpc.Request;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 呼叫具體的服務中的方法
 * @date 2022/5/15 21:19
 */
public class ServiceInvoker {
    public Object invoke(ServiceInstance serviceInstance, Request request){
        return ReflectionUtils.invoke(serviceInstance.getTarget(),
                serviceInstance.getMethod(),
                request.getParameters());
    }
}

ServiceManager.java

import com.rpc.ReflectionUtils;
import com.rpc.Request;
import com.rpc.ServiceDescriptor;
import lombok.extern.slf4j.Slf4j;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Wenbo
 * @version 1.0
 * @program
 * @description 服務管理類, 管理rpc暴露的服務
 * @date 2022/5/15 20:51
 */
@Slf4j
public class ServiceManager { // 這個其實就是LocalRegister方法, 將服務名和實現類儲存在map裡面, 可以根據服務名(介面名)找對應的實現類。
    private Map<ServiceDescriptor, ServiceInstance> services;
    public ServiceManager(){
        services = new ConcurrentHashMap<>();
    }
    public <T> void register(Class<T> interfaceClass, T bean){ // bean是實現類的物件
        Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass);
        for(Method method : methods){
            ServiceInstance sis = new ServiceInstance(bean, method);
            ServiceDescriptor sdp  = ServiceDescriptor.from(interfaceClass, method);
            services.put(sdp, sis);
//            System.out.println("方法個數" + services.size());
            log.info("register service: {} {}",sdp.getClazz(), sdp.getMethod());
        }
    }
    // 支援版本號的rpc
    public <T> void register(Class<T> interfaceClass,String version, T bean){
        Method[] methods = ReflectionUtils.getPublicMethods(interfaceClass);
        for(Method method:methods){
            ServiceInstance sis = new ServiceInstance(bean, method);
            ServiceDescriptor sdp = ServiceDescriptor.from(interfaceClass,version,method);

            services.put(sdp,sis);
            log.info("register service: {} {}",sdp.getClazz(), sdp.getMethod());

        }
    }

    public ServiceInstance lookup(Request request){
        ServiceDescriptor sdp = request.getServiceDescriptor();
        return services.get(sdp);
    }


}
最終執行結果

服務端啟動:

消費端呼叫: