RPC遠端呼叫
阿新 • • 發佈:2019-01-10
客戶端通過定義介面,通過動態代理的方式向服務端請求執行結果,服務端通過解析字串,來解析所要呼叫介面、方法、引數,並將介面實現的結果通過Socket的方式返回給客戶端。
1. 服務端將介面通過map的方式對映成方法名和介面類,並啟動socket連線。
package server; public interface ServerCenter { public void start(); public void stop(); public void register(Class service , Class serviceImpl); }
package serverImpl; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import server.ServerCenter; public class ServerCenterImpl implements ServerCenter { private final int port = 12000; @SuppressWarnings("rawtypes") private static ConcurrentHashMap<String, Class> register = new ConcurrentHashMap<>(); // 連線池:連線池中存在多個連線物件,每個物件都可以處理一個客戶請求 private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 是否啟動執行緒 private static boolean isRunning = false; @SuppressWarnings("resource") @Override public void start() { ServerSocket server = null; try { server = new ServerSocket(); server.bind(new InetSocketAddress(port)); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } isRunning = true; System.out.println("開啟服務執行緒..."); while (true) { Socket socket = null; try { socket = server.accept(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } executor.execute(new ServiceTask(socket)); } } // 關停請求執行緒 @Override public void stop() { // 如果有執行緒執行,則關閉 if (isRunning) { executor.shutdown(); } } // 註冊服務 @Override public void register(Class service, Class serviceImpl) { register.put(service.getName(), serviceImpl); } private static class ServiceTask implements Runnable { private Socket socket; public ServiceTask(Socket socket) { this.socket = socket; } @Override public void run() { ObjectInputStream input = null; ObjectOutputStream output = null; try { // 等待客戶端連線 input = new ObjectInputStream(socket.getInputStream()); // 序列流需要順序接收解析 String serviceName = input.readUTF(); String methodName = input.readUTF(); @SuppressWarnings("rawtypes") Class[] parametersTypes = (Class[]) input.readObject(); Object[] args = (Object[]) input.readObject(); // 根據client請求找到具體介面 Class<?> serviceClass = register.get(serviceName); // 根據方法名和引數型別獲取方法 Method method = serviceClass.getMethod(methodName, parametersTypes); // 執行方法 Object res = method.invoke(serviceClass.newInstance(), args); // 返回結果 output = new ObjectOutputStream(socket.getOutputStream()); output.writeObject(res); } catch (Exception e) { e.printStackTrace(); } finally { if (input != null) { try { input.close(); } catch (IOException e) { e.printStackTrace(); } } if (output != null) { try { output.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
2. 客戶端與服務端建立連線,通過JDK動態代理方式將請求的介面名、方法名、引數型別、引數列表傳送給服務端。服務端解析這些欄位,呼叫服務端的介面實現,將結果返回給客戶端。
package client; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; public class Client { @SuppressWarnings("unchecked") public static <T> T getRemoteProxyRes(Class<?> serviceInterface,InetSocketAddress addr){ /** * 1. 類載入器 classLoader: 載入需要代理的類(HelloServic介面) * 2. 需要代理物件,具有哪些功能 介面 * 3. */ return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[] {serviceInterface} , new InvocationHandler() { /* * (non-Javadoc) * @see java.lang.reflect.InvocationHandler#invoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[]) * proxy: 代理物件 * method: 哪個方向 * args :引數列表 */ @Override public Object invoke(Object proxy , Method method, Object[] args) throws Throwable { ObjectOutputStream output = null; ObjectInputStream input = null; @SuppressWarnings("resource") Socket socket = new Socket(); socket.connect(addr); try { // 傳送資訊 output = new ObjectOutputStream(socket.getOutputStream()); // 介面名 output.writeUTF(serviceInterface.getName()); // 方法名 output.writeUTF(method.getName()); // 引數型別 output.writeObject(method.getParameterTypes()); // 引數列表 output.writeObject(args); // 等待伺服器處理... // 讀取伺服器返回的結果 input = new ObjectInputStream(socket.getInputStream()); return input.readObject(); } catch (Exception e) { e.printStackTrace(); return null; } finally { if (input != null){ try { input.close(); } catch (IOException e) { e.printStackTrace(); } } if (output != null){ try { output.close(); } catch (IOException e) { e.printStackTrace(); } } } } }); } }