1. 程式人生 > >Java實現RPC(服務物件使用註解並自動注入)

Java實現RPC(服務物件使用註解並自動注入)

  • 使用到的技術:
  1. 註解和反射機制
  2. 包掃描以及jar包掃描
  3. CGlib動態代理
  4. 類似於spring框架的控制反轉依賴自動注入技術
  • 目錄結構:
  • RPCclass註解
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    public @interface RPCclass {
        boolean auto() default true;
    }
  •  RPCmethod註解
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface RPCmethod {
        String remoteName();
    }
  • RPClock
    public interface RPClock {
        public void wakeUpLock();
    }
  • RPCMethodDefinition
    import java.lang.reflect.Method;
    
    public class RPCMethodDefinition {
        //要通過反射呼叫的類
        private Class klass;
        //要通過反射執行方法的物件
        private Object object;
        //要執行的方法
        private Method method;
    
        public Class getKlass() {
            return klass;
        }
    
        public RPCMethodDefinition setKlass(Class klass) {
            this.klass = klass;
            return this;
        }
    
        public Object getObject() {
            return object;
        }
    
        public RPCMethodDefinition setObject(Object object) {
            this.object = object;
            return this;
        }
    
        public Method getMethod() {
            return method;
        }
    
        public RPCMethodDefinition setMethod(Method method) {
            this.method = method;
            return this;
        }
    }
  • RPCProxy
    import com.google.gson.Gson;
    import com.mec.rpc.annotation.RPCmethod;
    import com.mec.rpc.exception.MethodNotHaveRPCAnnotation;
    import com.mec.rpc.exception.RPCNotFoundProxyObject;
    import com.mec.rpc.exception.RPCProxyReadOutTimeException;
    import com.mec.uitl.GsonUitl;
    import net.sf.cglib.proxy.Enhancer;
    import net.sf.cglib.proxy.MethodInterceptor;
    import net.sf.cglib.proxy.MethodProxy;
    
    import java.io.DataInputStream;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.lang.reflect.Method;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.Map;
    
    public class RPCProxy {
        //RPC伺服器IP
        private String serverIp;
        //RPC伺服器埠
        private int port;
        //可以將物件序列化(實際上是轉為json字串)的一種工具
        private static Gson gson;
        //設定超時時間
        private int waitingTime;
        //RPC伺服器返回結果後通知的物件(這個是我自己實際工程中的應用,可以忽略)
        private Map<Object, RPClock> wakeUpMap;
    
        static {
            //相當於gson = new GsonBuilder().create();
            gson = GsonUitl.gson;
        }
    
        public RPCProxy(String serverIp, int port, int waitingTime) {
            this.serverIp = serverIp;
            this.port = port;
            this.waitingTime = waitingTime;
            this.wakeUpMap = new HashMap<Object, RPClock>();
        }
    
        public String getServerIp() {
            return serverIp;
        }
    
        public int getport() {
            return port;
        }
    
        public RPCProxy setwaitingTime(int waitingTime) {
            this.waitingTime = waitingTime;
            return this;
        }
    
        //通過Object取得代理(由於客戶端要通過RPC呼叫的方法是已知的,因此不存在再執行過程中動態代理)
        public <T> T getProxy(Object object) {
            return getProxy(object.getClass());
        }
    
        //通過Class取得代理
        public <T> T getProxy(Class klass) {
            //這是CGlib代理的一般步驟
            Enhancer enhancer = new Enhancer();
            enhancer.setSuperclass(klass);
            enhancer.setCallback(new MethodInterceptor() {
                //這是通過代理物件執行方法時真正執行的程式碼
                public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
                    //因為本人工程的需要要避免對hashCode、equals和toSting方法的遠端呼叫
                    if(method.getName().equals("hashCode") || method.getName().equals("equals") || method.getName().equals("toString")) {
                       return methodProxy.invokeSuper(o, objects);
                    }
    
                    //要求要被代理的方法必須帶有RPCmethod註解,否則丟擲異常
                    if(!method.isAnnotationPresent(RPCmethod.class)) {
                        throw new MethodNotHaveRPCAnnotation("方法[" + method + "]沒有RPCmethod註解");
                    }
    
                    //取得被代理方法的上的註解
                    RPCmethod rpcMethod = method.getAnnotation(RPCmethod.class);
                    //取得被代理方法的遠端呼叫方法的名字
                    String remoteName = rpcMethod.remoteName();
                    //將每個引數序列化並依次裝入列表裡
                    ArrayList<String> parameter = new ArrayList();
                    int parameterCount = objects.length;
                    for(int i = 0; i < parameterCount; i++) {
                        parameter.add(gson.toJson(objects[i]));
                    }
    
                    //和RPC伺服器建立通訊通道
                    Socket socket = new Socket(serverIp, port);
                    DataInputStream dis = new DataInputStream(socket.getInputStream());
                    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    
                    //初始化時間控制執行緒
                    Object lock = new Object();
                    ControlThread controlThread = new ControlThread(lock, dis, dos);
                    synchronized (lock) {
                        new Thread(controlThread).start();
                        lock.wait();
                    }
    
                    //首先發送遠端呼叫方法的remoteName
                    dos.writeUTF(remoteName);
                    //如果引數大於0,則把引數列表序列化併發送
                    if(parameterCount > 0) {
                        //序列化為json字串
                        String str = gson.toJson(parameter);
                        //將字串轉換為二進位制傳送,需先發送一個頭部表明需要接收的長度
                        byte[]  bytes = str.getBytes();
                        dos.writeInt(bytes.length);
                        dos.write(bytes);
                    }
    
                    Object result = null;
                    try {
                        //等待RPC伺服器返回結果
                        int length = dis.readInt();
                        byte[] bytes = new byte[length];
                        dis.readFully(bytes, 0 , length);
    
                         String reStr = new String(bytes);
                        if (reStr.equals("null")) {
                            return null;
                        }
    
                        result = gson.fromJson(reStr.trim(), method.getGenericReturnType());
                    } catch (Exception e) {
                        //發生異常有三種情況,
                        //一、無法和伺服器建立連線
                        //二、通訊過程中和伺服器斷開連線
                        //三、伺服器處理請求超時
                        e.printStackTrace();
                        throw new RPCProxyReadOutTimeException("向伺服器【" + serverIp + "】請求資料超時");
                    } finally {
                        //無論是否成功都要通知相應的RPClock
                        RPClock rpClock = wakeUpMap.get(o);
                        if(rpClock != null) {
                            rpClock.wakeUpLock();
                        }
    
                        //關閉輸入通道
                        synchronized (dis) {
                            try {
                                dis.close();
                            } catch (IOException e) {
                            }
                        }
    
                        //關閉輸出通道
                        synchronized (dos) {
                            try {
                                dos.close();
                            } catch (IOException e) {
                            }
                        }
    
                        //關閉套接字
                        try {
                            socket.close();
                        } catch (IOException e) {
                        }
                    }
                    return result;
                }
            });
    
            Object cglibProxy  = enhancer.create();
            //每一個代理物件對應一個RPClock
            wakeUpMap.put(cglibProxy, null);
            return (T) cglibProxy;
        }
    
        //通過新增RPClock的方法
        public void putRPClock(Object object, RPClock rpcLock) throws RPCNotFoundProxyObject {
            boolean found = false;
            for(Object o: wakeUpMap.keySet()) {
                if(o.equals(object)) {
                    found = true;
                }
            }
    
            if(!found) {
                throw new RPCNotFoundProxyObject("引數" + object + "未在" + this +"代理中找到");
            }
    
            wakeUpMap.put(object, rpcLock);
        }
    
        //遠端呼叫時間控制執行緒
        private class ControlThread implements Runnable {
            private DataInputStream dis;
            private DataOutputStream dos;
            private Object lock;
    
            public ControlThread(Object lock, DataInputStream dis, DataOutputStream dos) {
                this.lock = lock;
                this.dis = dis;
                this.dos = dos;
            }
    
            public void run() {
                synchronized (lock) {
                    lock.notify();
                }
    
                try {
                    Thread.sleep(waitingTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                //超出時間以後直接若socket未關閉則強制關閉
                try {
                    synchronized (dis) {
                        dis.close();
                    }
                    synchronized (dos) {
                        dos.close();
                    }
                } catch (IOException e) {
                }
            }
        }
    }
    
  • RPCServer
    import com.google.gson.Gson;
    import com.mec.rpc.annotation.RPCclass;
    import com.mec.rpc.annotation.RPCmethod;
    import com.mec.rpc.exception.ClassNotHaveRPCAnnotation;
    import com.mec.rpc.exception.RPCMethodParametersLengthMismatching;
    import com.mec.rpc.exception.RPCServiceNotRegister;
    import com.mec.rpc.exception.RPCServiceObjectNotLoad;
    import com.mec.uitl.GsonUitl;
    import com.mec.uitl.PackageScanner;
    import com.mec.uitl.TypeUitl;
    
    import java.io.DataInputStream;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.lang.reflect.Type;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.Map;
    
    public class RPCServer implements Runnable {
        //用來裝可以被遠端呼叫的方法
        private Map<String ,RPCMethodDefinition> serviceMap;
        //控制伺服器偵聽客戶端執行緒的變數
        private volatile boolean goon;
        private ServerSocket serverSocket;
        //json序列化工具
        private static Gson gson;
        //用來得到傳過來的實參
        private static Type type;
    
    
        static {
            gson = GsonUitl.gson; //        gson = new GsonBuilder().create();
            type = TypeUitl.type; //        type = new TypeToken<ArrayList<String>>() {}.getType();
        }
    
        public RPCServer() {
            serviceMap = new HashMap<String ,RPCMethodDefinition>();
            goon = false;
        }
    
        //啟動RPC伺服器
        public void start(int port) throws IOException {
            serverSocket = new ServerSocket(port);
    
            goon = true;
            new Thread(this).start();
        }
    
        //關閉伺服器
        public void stop() {
            if(serverSocket == null || serverSocket.isClosed()) {
                return;
            }
            goon = false;
            try {
                serverSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                serverSocket = null;
            }
        }
    
    
        //伺服器主執行緒,偵聽到就啟動一個執行緒去處理RPC請求
        public void run() {
            while (goon) {
                try {
                    Socket socket = serverSocket.accept();
                    new Thread(new DealRequest(socket)).start();
                } catch (IOException e) {
                    if(goon) {
                        e.printStackTrace();
                    } else {
                        break;
                    }
                }
            }
        }
    
    
        //處理遠端掉用的請求並返回結果
        private class DealRequest implements Runnable {
            private Socket socket;
            private DataOutputStream dos;
            private DataInputStream dis;
    
            public DealRequest(Socket socket) {
                this.socket = socket;
                try {
                    dis = new DataInputStream(socket.getInputStream());
                    dos = new DataOutputStream(socket.getOutputStream());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            //這裡的讀和寫與RPCProxy寫和讀一一對應
            public void run() {
                try {
                    //讀取遠端呼叫的remoteName
                    String remoteName = dis.readUTF();
                    //從RPC服務集合裡取出對應的RPCMethodDefinition
                    RPCMethodDefinition rdf = serviceMap.get(remoteName);
                    if(rdf == null) {
                        throw new RPCServiceNotRegister("服務" + remoteName + "未註冊");
                    }
    
                    Object object = rdf.getObject();
    
                    if(object == null) {
                        throw new RPCServiceObjectNotLoad("服務" + remoteName + "物件未載入");
                    }
    
                    Method method = rdf.getMethod();
                    Type[] parameters = method.getGenericParameterTypes();
                    int parameterCount = parameters.length;
                    Object result = null;
    
                    if(parameterCount <= 0) {
                        //如果引數為0,直接反射執行該方法
                        result = method.invoke(object);
                    } else {
                        //如果引數不為0.先接收序列化的實參,再進行反序列化
                        int length = dis.readInt();
                        byte[] bytes = new byte[length];
                        dis.readFully(bytes, 0 , length);
                        String parameter = new String(bytes);
                        ArrayList<String> parameterList = gson.fromJson(parameter, type);
                        if(parameterCount != parameterList.size()) {
                            throw new RPCMethodParametersLengthMismatching("服務" + remoteName + "引數個數不匹配" + parameterCount + " " + parameterList.size());
                        }
    
                        Object[] argus = new Object[parameterCount];
    
                        for (int i = 0; i < parameterCount; i++) {
                            argus[i] = gson.fromJson(parameterList.get(i), parameters[i]);
                        }
                        result = method.invoke(object, argus);
    
                    }
    
                    byte[] bytes= gson.toJson(result).getBytes();
                    dos.writeInt(bytes.length);
                    dos.write(bytes);
                    //確認資料全部發送完畢之後關閉相應資源
                    dos.flush();
            } catch (IOException e) {
                    e.printStackTrace();
                } catch (RPCServiceNotRegister rpcServiceNotRegister) {
                    rpcServiceNotRegister.printStackTrace();
                } catch (RPCServiceObjectNotLoad rpcServiceObjectNotLoad) {
                    rpcServiceObjectNotLoad.printStackTrace();
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                } catch (RPCMethodParametersLengthMismatching rpcMethodParametersLengthMismatching) {
                    rpcMethodParametersLengthMismatching.printStackTrace();
                } finally {
                    try {
                        dis.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    try {
                        dos.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
            }
        }
    
        //自動掃描指定包下的所有類,當檢測到帶有RPCclass註解的類時實行自動裝入
        public void scanPackage(String path) {
            //自制的包掃描工具,可以遍歷指定包下的所有類
            new PackageScanner() {
                public void dealClass(Class<?> klass) {
                    try {
                        if (klass.isAnnotationPresent(RPCclass.class)) {
                            addService(klass);
                        }
                    } catch (ClassNotHaveRPCAnnotation classNotHaveRPCAnnotation) {
                        return;
                    }
                }
            }.packageScanner(path);
        }
    
        //提供給外部增加RPCmethod的方法,引數為object時,表明該object是其呼叫對應類的方法時要反射執行方法的物件
        public void addService(Object object) throws ClassNotHaveRPCAnnotation {
            Class klass = object.getClass();
    
            if(!klass.isAnnotationPresent(RPCclass.class)) {
                throw new ClassNotHaveRPCAnnotation("class[" + klass + "]沒有RPCclass註解");
            }
    
            addService(klass, object);
        }
    
        //若只給一個Class物件,如果該物件是自動裝載,則自動創造一個新的物件,若該物件是另外裝入,則RPCMethodDefinition裡的object暫為null
        public void addService(Class klass) throws ClassNotHaveRPCAnnotation {
            if(!klass.isAnnotationPresent(RPCclass.class)) {
                throw new ClassNotHaveRPCAnnotation("class[" + klass + "]沒有RPCclass註解");
            }
    
            RPCclass rpCclass = (RPCclass) klass.getAnnotation(RPCclass.class);
            try {
                Object object = rpCclass.auto() ? klass.newInstance() : null;
                addService(klass, object);
            } catch (InstantiationException e) {
                e.printStackTrace();
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            }
        }
    
        private void addService(Class klass, Object object) {
                Method[] methods = klass.getDeclaredMethods();
                for(Method method : methods) {
                    if(!method.isAnnotationPresent(RPCmethod.class)) {
                        continue;
                    }
    
                    RPCmethod rpcMethod = method.getAnnotation(RPCmethod.class);
                    String remoteName = rpcMethod.remoteName();
                    RPCMethodDefinition rdf = serviceMap.get(remoteName);
                    //若rdf不為null,則表明是延遲載入的物件,只需設定對應的object就可以
                    if(rdf != null) {
                        rdf.setObject(object);
                        continue;
                    }
                    serviceMap.put(remoteName, new RPCMethodDefinition()
                            .setKlass(klass)
                            .setMethod(method)
                            .setObject(object));
                }
        }
    
    }
    
     
  • PackageScanner
    import java.io.File;
    import java.io.IOException;
    import java.net.JarURLConnection;
    import java.net.URISyntaxException;
    import java.net.URL;
    import java.util.Enumeration;
    import java.util.jar.JarEntry;
    import java.util.jar.JarFile;
    
    public abstract class PackageScanner {
        private ClassLoader classLoader;
    
        public PackageScanner() {
        }
    
        //實參為一個類,則得到其所在包的路徑
        public void packageScanner(Class<?> clazz) {
            this.packageScanner(clazz.getPackage().getName());
        }
    
        //實參為包路徑,例如com.mec
        public void packageScanner(String rootPackage) {
            String rootPath = rootPackage.replace(".", "/");
            this.classLoader = Thread.currentThread().getContextClassLoader();
    
            try {
                //通過這種可以得到包錄路徑下所有的類,包括jar包裡
                Enumeration urls = this.classLoader.getResources(rootPath);
    
                while(urls.hasMoreElements()) {
                    URL url = (URL)urls.nextElement();
                    String jarProtocol = url.getProtocol();
                    //普通類的jarProtocol為file,jar包裡的類為jar
                    if (jarProtocol.equals("file")) {
                        try {
                            File file = new File(url.toURI());
                            this.scanPackage(file.getAbsolutePath(), rootPackage);
                        } catch (URISyntaxException var7) {
                            var7.printStackTrace();
                        }
                    } else if (jarProtocol.equals("jar")) {
                        this.scanPackage(url);
                    }
                }
            } catch (IOException var8) {
                var8.printStackTrace();
            }
    
        }
    
        //這是拋給外部來處理遍歷到到的類的方法
        public abstract void dealClass(Class<?> var1);
    
        //處理不是jar包裡的類,利用遞迴遍歷的方法全部過一遍
        private void scanPackage(String path, String packageName) {
            File curFile = new File(path);
            if (curFile.exists()) {
                File[] files = curFile.listFiles();
                File[] var5 = files;
                int var6 = files.length;
    
                for(int var7 = 0; var7 < var6; ++var7) {
                    File file = var5[var7];
                    if (file.isDirectory()) {
                        this.scanPackage(file.getAbsolutePath(), packageName + "." + file.getName());
                    } else if (file.isFile() && file.getName().endsWith(".class")) {
                        String fileName = file.getName();
                        int dotInde = fileName.indexOf(".class");
                        fileName = fileName.substring(0, dotInde);
                        String className = packageName + "." + fileName;
    
                        try {
                            Class<?> klass = Class.forName(className);
                            this.dealClass(klass);
                        } catch (ClassNotFoundException var13) {
                            var13.printStackTrace();
                        }
                    }
                }
    
            }
        }
    
        //掃描jar包裡的類
        private void scanPackage(URL url) {
            try {
                JarURLConnection jarURLConnection = (JarURLConnection)url.openConnection();
                JarFile jarFile = jarURLConnection.getJarFile();
                Enumeration jarEntries = jarFile.entries();
    
                while(jarEntries.hasMoreElements()) {
                    JarEntry jarEntry = (JarEntry)jarEntries.nextElement();
                    if (!jarEntry.isDirectory() && jarEntry.getName().endsWith(".class")) {
                        String className = jarEntry.getName();
                        int dotIndex = className.indexOf(".class");
                        className = className.substring(0, dotIndex).replace("/", ".");
                        if (className.startsWith("com.mec")) {
                            try {
                                Class<?> klass = Class.forName(className);
                                this.dealClass(klass);
                            } catch (ClassNotFoundException var9) {
                                var9.printStackTrace();
                            }
                        }
                    }
                }
            } catch (IOException var10) {
                var10.printStackTrace();
            }
    
        }
    }
  • Studen(測試用的類)
    import com.mec.rpc.annotation.RPCclass;
    import com.mec.rpc.annotation.RPCmethod;
    
    @RPCclass
    public class Student {
        private String name;
    
        @RPCmethod(remoteName = "getStudentName")
        public String getName() {
            return name == null ? "小明" : name;
        }
    
        @RPCmethod(remoteName = "setStudentName")
        public Student setName(String name) {
            this.name = name;
            return this;
        }
    }
  • ServerTest
    import java.io.IOException;
    
    public class ServerTest {
        public static void main(String[] args) {
            //建立一個RPC伺服器並註冊一個學生為小綠
            RPCServer rpcServer = new RPCServer();
            Student student = new Student();
            student.setName("小綠");
            rpcServer.scanPackage("com.mec.Test");
            rpcServer.addService(student);
            try {
                rpcServer.start(54196);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
  • ClientTest
    import com.mec.rpc.core.RPCProxy;
    
    public class ClientTest {
        public static void main(String[] args) {
            RPCProxy localProxy = new RPCProxy("localhost", 54196, 5000);
            Student student = localProxy.getProxy(Student.class);
            System.out.println(student.getName());
        }
    }
  • 結果

 如果失敗的話輸出的應該是小明,輸出的是小綠則證明成功。

通過包掃描和註解的方式可以實現自動注入而無須手動新增RPC服務,包掃描技術非常方便的讓我們處理類,通過反射可以獲得有相應註解的類和方法是關鍵。