從頭建立一個簡單的RPC服務框架
概念解釋
RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。RPC採用客戶機/伺服器模式。請求程式就是一個客戶機,而服務提供程式就是一個伺服器。首先,客戶機呼叫程序傳送一個有程序引數的呼叫資訊到服務程序,然後等待應答資訊。在伺服器端,程序保持睡眠狀態直到呼叫資訊到達為止。當一個呼叫資訊到達,伺服器獲得程序引數,計算結果,傳送答覆資訊,然後等待下一個呼叫資訊,最後,客戶端呼叫程序接收答覆資訊,獲得程序結果,然後呼叫執行繼續進行。
ServerSocket 與 Socket
- 客戶端與服務端建立連線基本原理
- 客戶端與服務端建立連線流程
ServerSocket
ServerSocket server = new ServerSocket(1234); Socket socket = server.accept();
建立並監聽一個埠為1234的ServerSocket物件,然後呼叫了ServerSocket物件的accept()方法,這個方法的執行將使Server端的程式處於阻塞狀態,程式將一直阻塞直到捕捉到一個來自Client端的請求,並返回一個用於與該Client通訊的Socket物件。此後Server程式只要向這個Socket物件讀寫資料,就可以實現向遠端的Client讀寫資料。結束監聽時,關閉ServerSocket物件
Socket
Socket socket = new Socket("127.0.0.1", 1234);
客戶端建立一個socket物件,與服務端建立連線,服務端管理客戶連線請求的任務由作業系統完成,作業系統把連線請求儲存在一個先進先出的佇列中,佇列長度一般為50。當服務端連線佇列長度大於50時,連線被拒絕。
- 客戶端與服務端建立連線基本原理
動態代理機制
java的動態代理機制中,有兩個重要的類和介面,Proxy、InvocationHandler,他們都在java.lang.reflect包下,這個類和介面是在實現動態代理過程中必須用到的Proxy
Proxy用於建立一個代理類物件,它提供了許多方法,其中應用最多的是 newProxyInstance這個方法。public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)
引數解釋:
loader: 一個ClassLoader物件,定義了代理物件是由哪個Classloader物件來載入
interfaces: 一個interface陣列,表示代理我們將為代理物件提供一組什麼介面,如果我提供了一組介面,則代理物件就宣稱實現了該組介面,這樣我們就能通過代理物件呼叫相應介面了
h: 一個InvocationHandler物件,表示動態代理物件在呼叫方法時具體執行的物件InvocationHandler
每個動態代理類都必須實現InvocationHandler介面,且每個代理類的例項都關聯到一個InvocationHandler物件,當代理物件呼叫一個方法時,這個方法的呼叫會對映到InvocationHandler的invoke方法執行public Object invoke(Object proxy, Method method, Object[] args)
引數解釋:
proxy: 指代代理的真實物件
method: 指代所要呼叫的真實物件的某個方法
args: 指代呼叫真實物件方法時接受的引數動態代理示例
定義測試介面
public interface TestHello { void hello(String name); void sayGood(); }
定義介面實現
public class TestHelloImpl implements TestHello{ @Override public void hello(String name) { System.out.println("hello " + name); } @Override public void sayGood() { System.out.println("you are the best!"); } }
定義動態代理類
public class MyProxy implements InvocationHandler { private Object proxy; public MyProxy(Object proxy) { this.proxy = proxy; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { method.invoke(this.proxy, args); return null; } }
模擬代理呼叫
public class Client { public static void main(String[] args) { TestHello th = new TestHelloImpl(); MyProxy mp = new MyProxy(th); TestHello thProxy = (TestHello) Proxy.newProxyInstance(mp.getClass().getClassLoader(), th.getClass().getInterfaces(), mp); thProxy.hello("Jerry"); thProxy.sayGood(); } }
測試輸出
hello Jerry you are the best!
一個簡單的RPC實現
簡單RPC框架類,定義服務註冊方法 export, 服務消費方法 refer
public class RpcFramework { /** * 暴露服務 * * @param service 服務實現 * @param port 服務埠 * @throws Exception */ public static void export(final Object service, int port) throws Exception { if (service == null) throw new IllegalArgumentException("service instance == null"); if (port <= 0 || port > 65535) throw new IllegalArgumentException("Invalid port " + port); System.out.println("Export service " + service.getClass().getName() + " on port " + port); ServerSocket server = new ServerSocket(port); for(;;) { try { final Socket socket = server.accept(); new Thread(new Runnable() { @Override public void run() { try { try { ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { String methodName = input.readUTF(); Class<?>[] parameterTypes = (Class<?>[])input.readObject(); Object[] arguments = (Object[])input.readObject(); ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { Method method = service.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(service, arguments); output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { output.close(); } } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { e.printStackTrace(); } } }).start(); } catch (Exception e) { e.printStackTrace(); } } } /** * 引用服務 * * @param <T> 介面泛型 * @param interfaceClass 介面型別 * @param host 伺服器主機名 * @param port 伺服器埠 * @return 遠端服務 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null) throw new IllegalArgumentException("Interface class == null"); if (! interfaceClass.isInterface()) throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!"); if (host == null || host.length() == 0) throw new IllegalArgumentException("Host == null!"); if (port <= 0 || port > 65535) throw new IllegalArgumentException("Invalid port " + port); System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port); return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); InputStream is = socket.getInputStream(); ObjectInputStream input = new ObjectInputStream(is); try { Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
定義具體介面和實現
public interface HelloService { String hello(String name); }
public class HelloServiceImpl implements HelloService{ @Override public String hello(String name) { return "hello " + name; } }
生成並註冊服務物件到服務端
public class RpcProvider { public static void main(String[] args) { try { HelloService service = new HelloServiceImpl(); RpcFramework.export(service, 1235); } catch (Exception e) { e.printStackTrace(); } } }
客戶端宣告服務端定義的服務物件,使用服務
public class RpcConsumer { public static void main(String[] args) { try { HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1235); for (int i = 0; i < Integer.MAX_VALUE; i ++) { String hello = service.hello("World" + i); System.out.println(hello); Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } } }
注
- 本文多數內容非原創,僅供學習使用,歡迎指正!