分散式思想和rpc解決方案介紹
阿新 • • 發佈:2018-11-13
1.RPC的誕生
RPC(Remote Procedure Call)遠端過程呼叫,通過這個rpc協議,呼叫遠端計算機上的服務,就像呼叫本地的服務一樣。
不同的服務部署在不同的機器上面,並且在啟動後在註冊中心進行註冊,如果要呼叫,可以通過rpc呼叫對應的服務。
如圖,在不同的Controller中可以從註冊中心(可以使用eureka,zookeeper實現,本文例子使用簡單的hash
map作為實現)獲取可以呼叫的服務,然後通過rpc進行呼叫。
2.java遠端的遠端呼叫-RMI(Remote method Invoke)
java提供了遠端的對於遠端服務呼叫的支援:RMI(Remote method Invoke)。
3.手寫一個RPC框架
3.1 實現的技術方案
設計技術點:Socket通訊、動態代理與反射、Java序列化
RPC本質是使用動態代理,通過網路通訊技術進行增強。
3.2程式碼實現
3.2.1 客戶端程式碼
package main.java.rpc; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; //rpc框架的客戶端代理部分 public class RpcClientFrame { /*動態代理類,實現了對遠端服務的訪問*/ private static class DynProxy implements InvocationHandler{ //遠端呼叫的服務 private Class serviceClass; //遠端呼叫地址 private final InetSocketAddress addr; public DynProxy(Class serviceClass,InetSocketAddress addr) { this.serviceClass = serviceClass; this.addr = addr; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; Socket socket = null; try { socket = new Socket(); socket.connect(addr); //類名 方法名 方法型別列表 方法入參列表 outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeUTF(serviceClass.getSimpleName()); outputStream.writeUTF(method.getName()); outputStream.writeObject(method.getParameterTypes()); outputStream.writeObject(args); outputStream.flush(); inputStream = new ObjectInputStream(socket.getInputStream()); //我們要把呼叫的細節打印出來 System.out.println("遠端呼叫成功!" + serviceClass.getName()); //最後要網路的請求返回給返回 return inputStream.readObject(); } catch (Exception e) { e.printStackTrace(); } finally { socket.close(); inputStream.close(); outputStream.close(); } return null; } }
//定義客戶端要定義的服務
package enjoyedu.service;
/**
* 享學課堂
*類說明:服務員介面
*/
public interface TechInterface {
//洗腳服務
String XJ(String name);
}
package main.java; import main.java.rpc.RpcClientFrame; import main.java.service.TechInterface; /** * rpc的客戶端呼叫遠端服務 * @author hasee * */ public class Client { public static void main(String[] args) { //動態代理獲取我們的物件 TechInterface techInterface = (TechInterface) RpcClientFrame.getProxyObject(TechInterface.class); //進遠端呼叫我們的物件 System.out.println(techInterface.XJ("luke")); } }
3.2.2服務端和註冊中心程式碼
1.//服務端定義要呼叫的服務介面
package service;
public interface TechInterface {
//洗腳服務
String XJ(String name);
}
2.//服務端定義要呼叫的服務的介面實現類
package service.impl;
import service.TechInterface;
public class TechImpl implements TechInterface {
public String XJ(String name) {
return "您好,13號技師為你服務:"+name;
}
}
package server;
import java.io.IOException;
import javax.imageio.spi.RegisterableService;
import register.RegisterCenter;
import service.TechInterface;
import service.impl.TechImpl;
/**
* rpc的服務端,提供服務
* @author hasee
*
*/
public class Server {
public static void main(String[] args) throws IOException {
RegisterCenter registerCenter = new RegisterCenter(8888);
//註冊技師物件至註冊中心
registerCenter.register(TechInterface.class, TechImpl.class);
registerCenter.start();
}
}
package register;
/**
* 註冊中心,這個例子使用一個hashmap作為實現
* @author hasee
*
*/
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RegisterCenter {
//執行緒池
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//定義註冊中心的靜態物件
private static Map<String, Class> serviceRegistry = new HashMap<String, Class>();
//服務埠
private static int port = 8888;
/**
* 註冊服務
* @param serviceInterface 介面名字
* @param impl 實現類的class物件
*/
public void register(Class serviceInterface, Class impl) {
//服務的註冊:socket通訊+反射
serviceRegistry.put(serviceInterface.getSimpleName(), impl);
}
public RegisterCenter(int port) {
this.port = port;
}
/**
* 啟動服務端
* @throws IOException
*/
public static void start() throws IOException {
// 建立ServerSocket例項監聽埠
ServerSocket serverSocket = new ServerSocket(port);
System.out.println("start server");
// 1.監聽客戶端的TCP連線,接到TCP連線後將其封裝成task,由執行緒池執行,並且同時將socket送入(server.accept()=socket)
try {
while (true) {
//serverSocket.accept()會阻塞直到服務端接受到客戶端的請求。
executorService.execute(new ServiceTask(serverSocket.accept()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 將客戶端的每一個請求都封裝成一個執行緒ServiceTask,投放到執行緒池裡面進行執行。
* @author hasee
*
*/
private static class ServiceTask implements Runnable {
private Socket client;
public ServiceTask(Socket client) {
this.client = client;
}
public void run() {
//讀取socket中的流資料
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
// 類名、方法名、引數型別、引數值
inputStream = new ObjectInputStream(client.getInputStream());
//獲取呼叫服務名稱
String serviceName = inputStream.readUTF();
//獲取呼叫方法的名稱
String methodName = inputStream.readUTF();
//獲取引數型別列表
Class<?>[] requiresTypes = (Class<?>[]) inputStream.readObject();
//獲取引數列表
Object[] args = (Object[]) inputStream.readObject();
Class serviceClass = serviceRegistry.get(serviceName);
//反射呼叫方法
Method method = serviceClass.getMethod(methodName, requiresTypes);
Object result = method.invoke(serviceClass.newInstance(), args);
//把結果反饋到客戶端
outputStream = new ObjectOutputStream(client.getOutputStream());
outputStream.writeObject(result);
outputStream.flush();
//關閉io資源
inputStream.close();
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
3.2.3 測試結果
- 先啟動服務端
- 其次啟動客戶端
輸出結果:您好,13號技師為你服務:luke