Java實現RPC(服務物件使用註解並自動注入)
阿新 • • 發佈:2018-12-16
- 使用到的技術:
- 註解和反射機制
- 包掃描以及jar包掃描
- CGlib動態代理
- 類似於spring框架的控制反轉依賴自動注入技術
- 目錄結構:
- RPCclass註解
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface RPCclass { boolean auto() default true; }
- RPCmethod註解
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface RPCmethod { String remoteName(); }
- RPClock
public interface RPClock { public void wakeUpLock(); }
- RPCMethodDefinition
import java.lang.reflect.Method; public class RPCMethodDefinition { //要通過反射呼叫的類 private Class klass; //要通過反射執行方法的物件 private Object object; //要執行的方法 private Method method; public Class getKlass() { return klass; } public RPCMethodDefinition setKlass(Class klass) { this.klass = klass; return this; } public Object getObject() { return object; } public RPCMethodDefinition setObject(Object object) { this.object = object; return this; } public Method getMethod() { return method; } public RPCMethodDefinition setMethod(Method method) { this.method = method; return this; } }
- RPCProxy
import com.google.gson.Gson; import com.mec.rpc.annotation.RPCmethod; import com.mec.rpc.exception.MethodNotHaveRPCAnnotation; import com.mec.rpc.exception.RPCNotFoundProxyObject; import com.mec.rpc.exception.RPCProxyReadOutTimeException; import com.mec.uitl.GsonUitl; import net.sf.cglib.proxy.Enhancer; import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.Method; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; public class RPCProxy { //RPC伺服器IP private String serverIp; //RPC伺服器埠 private int port; //可以將物件序列化(實際上是轉為json字串)的一種工具 private static Gson gson; //設定超時時間 private int waitingTime; //RPC伺服器返回結果後通知的物件(這個是我自己實際工程中的應用,可以忽略) private Map<Object, RPClock> wakeUpMap; static { //相當於gson = new GsonBuilder().create(); gson = GsonUitl.gson; } public RPCProxy(String serverIp, int port, int waitingTime) { this.serverIp = serverIp; this.port = port; this.waitingTime = waitingTime; this.wakeUpMap = new HashMap<Object, RPClock>(); } public String getServerIp() { return serverIp; } public int getport() { return port; } public RPCProxy setwaitingTime(int waitingTime) { this.waitingTime = waitingTime; return this; } //通過Object取得代理(由於客戶端要通過RPC呼叫的方法是已知的,因此不存在再執行過程中動態代理) public <T> T getProxy(Object object) { return getProxy(object.getClass()); } //通過Class取得代理 public <T> T getProxy(Class klass) { //這是CGlib代理的一般步驟 Enhancer enhancer = new Enhancer(); enhancer.setSuperclass(klass); enhancer.setCallback(new MethodInterceptor() { //這是通過代理物件執行方法時真正執行的程式碼 public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { //因為本人工程的需要要避免對hashCode、equals和toSting方法的遠端呼叫 if(method.getName().equals("hashCode") || method.getName().equals("equals") || method.getName().equals("toString")) { return methodProxy.invokeSuper(o, objects); } //要求要被代理的方法必須帶有RPCmethod註解,否則丟擲異常 if(!method.isAnnotationPresent(RPCmethod.class)) { throw new MethodNotHaveRPCAnnotation("方法[" + method + "]沒有RPCmethod註解"); } //取得被代理方法的上的註解 RPCmethod rpcMethod = method.getAnnotation(RPCmethod.class); //取得被代理方法的遠端呼叫方法的名字 String remoteName = rpcMethod.remoteName(); //將每個引數序列化並依次裝入列表裡 ArrayList<String> parameter = new ArrayList(); int parameterCount = objects.length; for(int i = 0; i < parameterCount; i++) { parameter.add(gson.toJson(objects[i])); } //和RPC伺服器建立通訊通道 Socket socket = new Socket(serverIp, port); DataInputStream dis = new DataInputStream(socket.getInputStream()); DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); //初始化時間控制執行緒 Object lock = new Object(); ControlThread controlThread = new ControlThread(lock, dis, dos); synchronized (lock) { new Thread(controlThread).start(); lock.wait(); } //首先發送遠端呼叫方法的remoteName dos.writeUTF(remoteName); //如果引數大於0,則把引數列表序列化併發送 if(parameterCount > 0) { //序列化為json字串 String str = gson.toJson(parameter); //將字串轉換為二進位制傳送,需先發送一個頭部表明需要接收的長度 byte[] bytes = str.getBytes(); dos.writeInt(bytes.length); dos.write(bytes); } Object result = null; try { //等待RPC伺服器返回結果 int length = dis.readInt(); byte[] bytes = new byte[length]; dis.readFully(bytes, 0 , length); String reStr = new String(bytes); if (reStr.equals("null")) { return null; } result = gson.fromJson(reStr.trim(), method.getGenericReturnType()); } catch (Exception e) { //發生異常有三種情況, //一、無法和伺服器建立連線 //二、通訊過程中和伺服器斷開連線 //三、伺服器處理請求超時 e.printStackTrace(); throw new RPCProxyReadOutTimeException("向伺服器【" + serverIp + "】請求資料超時"); } finally { //無論是否成功都要通知相應的RPClock RPClock rpClock = wakeUpMap.get(o); if(rpClock != null) { rpClock.wakeUpLock(); } //關閉輸入通道 synchronized (dis) { try { dis.close(); } catch (IOException e) { } } //關閉輸出通道 synchronized (dos) { try { dos.close(); } catch (IOException e) { } } //關閉套接字 try { socket.close(); } catch (IOException e) { } } return result; } }); Object cglibProxy = enhancer.create(); //每一個代理物件對應一個RPClock wakeUpMap.put(cglibProxy, null); return (T) cglibProxy; } //通過新增RPClock的方法 public void putRPClock(Object object, RPClock rpcLock) throws RPCNotFoundProxyObject { boolean found = false; for(Object o: wakeUpMap.keySet()) { if(o.equals(object)) { found = true; } } if(!found) { throw new RPCNotFoundProxyObject("引數" + object + "未在" + this +"代理中找到"); } wakeUpMap.put(object, rpcLock); } //遠端呼叫時間控制執行緒 private class ControlThread implements Runnable { private DataInputStream dis; private DataOutputStream dos; private Object lock; public ControlThread(Object lock, DataInputStream dis, DataOutputStream dos) { this.lock = lock; this.dis = dis; this.dos = dos; } public void run() { synchronized (lock) { lock.notify(); } try { Thread.sleep(waitingTime); } catch (InterruptedException e) { e.printStackTrace(); } //超出時間以後直接若socket未關閉則強制關閉 try { synchronized (dis) { dis.close(); } synchronized (dos) { dos.close(); } } catch (IOException e) { } } } }
- RPCServer
import com.google.gson.Gson; import com.mec.rpc.annotation.RPCclass; import com.mec.rpc.annotation.RPCmethod; import com.mec.rpc.exception.ClassNotHaveRPCAnnotation; import com.mec.rpc.exception.RPCMethodParametersLengthMismatching; import com.mec.rpc.exception.RPCServiceNotRegister; import com.mec.rpc.exception.RPCServiceObjectNotLoad; import com.mec.uitl.GsonUitl; import com.mec.uitl.PackageScanner; import com.mec.uitl.TypeUitl; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Type; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; public class RPCServer implements Runnable { //用來裝可以被遠端呼叫的方法 private Map<String ,RPCMethodDefinition> serviceMap; //控制伺服器偵聽客戶端執行緒的變數 private volatile boolean goon; private ServerSocket serverSocket; //json序列化工具 private static Gson gson; //用來得到傳過來的實參 private static Type type; static { gson = GsonUitl.gson; // gson = new GsonBuilder().create(); type = TypeUitl.type; // type = new TypeToken<ArrayList<String>>() {}.getType(); } public RPCServer() { serviceMap = new HashMap<String ,RPCMethodDefinition>(); goon = false; } //啟動RPC伺服器 public void start(int port) throws IOException { serverSocket = new ServerSocket(port); goon = true; new Thread(this).start(); } //關閉伺服器 public void stop() { if(serverSocket == null || serverSocket.isClosed()) { return; } goon = false; try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } finally { serverSocket = null; } } //伺服器主執行緒,偵聽到就啟動一個執行緒去處理RPC請求 public void run() { while (goon) { try { Socket socket = serverSocket.accept(); new Thread(new DealRequest(socket)).start(); } catch (IOException e) { if(goon) { e.printStackTrace(); } else { break; } } } } //處理遠端掉用的請求並返回結果 private class DealRequest implements Runnable { private Socket socket; private DataOutputStream dos; private DataInputStream dis; public DealRequest(Socket socket) { this.socket = socket; try { dis = new DataInputStream(socket.getInputStream()); dos = new DataOutputStream(socket.getOutputStream()); } catch (IOException e) { e.printStackTrace(); } } //這裡的讀和寫與RPCProxy寫和讀一一對應 public void run() { try { //讀取遠端呼叫的remoteName String remoteName = dis.readUTF(); //從RPC服務集合裡取出對應的RPCMethodDefinition RPCMethodDefinition rdf = serviceMap.get(remoteName); if(rdf == null) { throw new RPCServiceNotRegister("服務" + remoteName + "未註冊"); } Object object = rdf.getObject(); if(object == null) { throw new RPCServiceObjectNotLoad("服務" + remoteName + "物件未載入"); } Method method = rdf.getMethod(); Type[] parameters = method.getGenericParameterTypes(); int parameterCount = parameters.length; Object result = null; if(parameterCount <= 0) { //如果引數為0,直接反射執行該方法 result = method.invoke(object); } else { //如果引數不為0.先接收序列化的實參,再進行反序列化 int length = dis.readInt(); byte[] bytes = new byte[length]; dis.readFully(bytes, 0 , length); String parameter = new String(bytes); ArrayList<String> parameterList = gson.fromJson(parameter, type); if(parameterCount != parameterList.size()) { throw new RPCMethodParametersLengthMismatching("服務" + remoteName + "引數個數不匹配" + parameterCount + " " + parameterList.size()); } Object[] argus = new Object[parameterCount]; for (int i = 0; i < parameterCount; i++) { argus[i] = gson.fromJson(parameterList.get(i), parameters[i]); } result = method.invoke(object, argus); } byte[] bytes= gson.toJson(result).getBytes(); dos.writeInt(bytes.length); dos.write(bytes); //確認資料全部發送完畢之後關閉相應資源 dos.flush(); } catch (IOException e) { e.printStackTrace(); } catch (RPCServiceNotRegister rpcServiceNotRegister) { rpcServiceNotRegister.printStackTrace(); } catch (RPCServiceObjectNotLoad rpcServiceObjectNotLoad) { rpcServiceObjectNotLoad.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (RPCMethodParametersLengthMismatching rpcMethodParametersLengthMismatching) { rpcMethodParametersLengthMismatching.printStackTrace(); } finally { try { dis.close(); } catch (IOException e) { e.printStackTrace(); } try { dos.close(); } catch (IOException e) { e.printStackTrace(); } try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } //自動掃描指定包下的所有類,當檢測到帶有RPCclass註解的類時實行自動裝入 public void scanPackage(String path) { //自制的包掃描工具,可以遍歷指定包下的所有類 new PackageScanner() { public void dealClass(Class<?> klass) { try { if (klass.isAnnotationPresent(RPCclass.class)) { addService(klass); } } catch (ClassNotHaveRPCAnnotation classNotHaveRPCAnnotation) { return; } } }.packageScanner(path); } //提供給外部增加RPCmethod的方法,引數為object時,表明該object是其呼叫對應類的方法時要反射執行方法的物件 public void addService(Object object) throws ClassNotHaveRPCAnnotation { Class klass = object.getClass(); if(!klass.isAnnotationPresent(RPCclass.class)) { throw new ClassNotHaveRPCAnnotation("class[" + klass + "]沒有RPCclass註解"); } addService(klass, object); } //若只給一個Class物件,如果該物件是自動裝載,則自動創造一個新的物件,若該物件是另外裝入,則RPCMethodDefinition裡的object暫為null public void addService(Class klass) throws ClassNotHaveRPCAnnotation { if(!klass.isAnnotationPresent(RPCclass.class)) { throw new ClassNotHaveRPCAnnotation("class[" + klass + "]沒有RPCclass註解"); } RPCclass rpCclass = (RPCclass) klass.getAnnotation(RPCclass.class); try { Object object = rpCclass.auto() ? klass.newInstance() : null; addService(klass, object); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } private void addService(Class klass, Object object) { Method[] methods = klass.getDeclaredMethods(); for(Method method : methods) { if(!method.isAnnotationPresent(RPCmethod.class)) { continue; } RPCmethod rpcMethod = method.getAnnotation(RPCmethod.class); String remoteName = rpcMethod.remoteName(); RPCMethodDefinition rdf = serviceMap.get(remoteName); //若rdf不為null,則表明是延遲載入的物件,只需設定對應的object就可以 if(rdf != null) { rdf.setObject(object); continue; } serviceMap.put(remoteName, new RPCMethodDefinition() .setKlass(klass) .setMethod(method) .setObject(object)); } } }
- PackageScanner
import java.io.File; import java.io.IOException; import java.net.JarURLConnection; import java.net.URISyntaxException; import java.net.URL; import java.util.Enumeration; import java.util.jar.JarEntry; import java.util.jar.JarFile; public abstract class PackageScanner { private ClassLoader classLoader; public PackageScanner() { } //實參為一個類,則得到其所在包的路徑 public void packageScanner(Class<?> clazz) { this.packageScanner(clazz.getPackage().getName()); } //實參為包路徑,例如com.mec public void packageScanner(String rootPackage) { String rootPath = rootPackage.replace(".", "/"); this.classLoader = Thread.currentThread().getContextClassLoader(); try { //通過這種可以得到包錄路徑下所有的類,包括jar包裡 Enumeration urls = this.classLoader.getResources(rootPath); while(urls.hasMoreElements()) { URL url = (URL)urls.nextElement(); String jarProtocol = url.getProtocol(); //普通類的jarProtocol為file,jar包裡的類為jar if (jarProtocol.equals("file")) { try { File file = new File(url.toURI()); this.scanPackage(file.getAbsolutePath(), rootPackage); } catch (URISyntaxException var7) { var7.printStackTrace(); } } else if (jarProtocol.equals("jar")) { this.scanPackage(url); } } } catch (IOException var8) { var8.printStackTrace(); } } //這是拋給外部來處理遍歷到到的類的方法 public abstract void dealClass(Class<?> var1); //處理不是jar包裡的類,利用遞迴遍歷的方法全部過一遍 private void scanPackage(String path, String packageName) { File curFile = new File(path); if (curFile.exists()) { File[] files = curFile.listFiles(); File[] var5 = files; int var6 = files.length; for(int var7 = 0; var7 < var6; ++var7) { File file = var5[var7]; if (file.isDirectory()) { this.scanPackage(file.getAbsolutePath(), packageName + "." + file.getName()); } else if (file.isFile() && file.getName().endsWith(".class")) { String fileName = file.getName(); int dotInde = fileName.indexOf(".class"); fileName = fileName.substring(0, dotInde); String className = packageName + "." + fileName; try { Class<?> klass = Class.forName(className); this.dealClass(klass); } catch (ClassNotFoundException var13) { var13.printStackTrace(); } } } } } //掃描jar包裡的類 private void scanPackage(URL url) { try { JarURLConnection jarURLConnection = (JarURLConnection)url.openConnection(); JarFile jarFile = jarURLConnection.getJarFile(); Enumeration jarEntries = jarFile.entries(); while(jarEntries.hasMoreElements()) { JarEntry jarEntry = (JarEntry)jarEntries.nextElement(); if (!jarEntry.isDirectory() && jarEntry.getName().endsWith(".class")) { String className = jarEntry.getName(); int dotIndex = className.indexOf(".class"); className = className.substring(0, dotIndex).replace("/", "."); if (className.startsWith("com.mec")) { try { Class<?> klass = Class.forName(className); this.dealClass(klass); } catch (ClassNotFoundException var9) { var9.printStackTrace(); } } } } } catch (IOException var10) { var10.printStackTrace(); } } }
- Studen(測試用的類)
import com.mec.rpc.annotation.RPCclass; import com.mec.rpc.annotation.RPCmethod; @RPCclass public class Student { private String name; @RPCmethod(remoteName = "getStudentName") public String getName() { return name == null ? "小明" : name; } @RPCmethod(remoteName = "setStudentName") public Student setName(String name) { this.name = name; return this; } }
- ServerTest
import java.io.IOException; public class ServerTest { public static void main(String[] args) { //建立一個RPC伺服器並註冊一個學生為小綠 RPCServer rpcServer = new RPCServer(); Student student = new Student(); student.setName("小綠"); rpcServer.scanPackage("com.mec.Test"); rpcServer.addService(student); try { rpcServer.start(54196); } catch (IOException e) { e.printStackTrace(); } } }
- ClientTest
import com.mec.rpc.core.RPCProxy; public class ClientTest { public static void main(String[] args) { RPCProxy localProxy = new RPCProxy("localhost", 54196, 5000); Student student = localProxy.getProxy(Student.class); System.out.println(student.getName()); } }
- 結果
如果失敗的話輸出的應該是小明,輸出的是小綠則證明成功。
通過包掃描和註解的方式可以實現自動注入而無須手動新增RPC服務,包掃描技術非常方便的讓我們處理類,通過反射可以獲得有相應註解的類和方法是關鍵。