初識RPC中的RMI以及簡單的程式碼實現
RPC和RMI
RPC的全稱為Remote Procedure Call,即遠端過程呼叫。
RMI的全稱為Remote Method Invocation,即原程方法呼叫。
RPC客戶器和RPC伺服器可以執行在不同的java虛擬機器中。RPC客戶端只需要引入一個介面,而這個介面的實現以及呼叫時所需要的資料都在RPC伺服器中。在RPC伺服器在客戶端呼叫之前就將客戶端可能用到的所有介面regist(註冊),RPC客戶端在“執行”相關方法時,其實際上只是傳送相關方法的引數和序列號到RPC伺服器端,RPC伺服器在呼叫相關方法之後將呼叫的結果回送到RPC客戶端。也可以稱RPC客戶端為偽執行端,而RPC伺服器為真正執行端。RPC主要依賴的技術是序列化、反序列化、傳輸協議。RPC有多種框架,有RMI、Hessian、Dubbo框架等。
RMI的實現過程為:
- 伺服器將客戶端將要可能呼叫的所有介面註冊,並儲存。
- 客戶端使用動態代理代理我們將要在伺服器端呼叫的介面,使客戶端呼叫伺服器端相關方法就像呼叫本機方法一樣。
- 客戶端將本地將要呼叫的介面方法的引數及序列號通過網路傳送到伺服器。
- 伺服器接收到客戶端傳送的資訊後,從儲存這些方法中進行查詢,通過反射呼叫該方法,並進行一定的攔截。
- 伺服器完成攔截後將執行的結果回送給客戶端。
- 客戶端接收到執行結果。
下圖為實現的大概框圖(注:上面的1、2、3序列號與下圖中的序列號相互不對應)
上面說了那麼多(有點囉嗦,還可能表達不清楚),還是不如具體看程式碼來的實在。
首先是建立伺服器,其框架如圖
建立一個RPCServer類
private ServerSocket server;
private int rpcServerPort;//伺服器埠號
private boolean goon;//判斷伺服器是否繼續偵聽
private long count;//客戶端連線個數
private RpcBeanFactory rpcBeanFactory;//註冊相關方法時用到
類中方法:
public class RpcServer implements Runnable { public RpcServer(int rpcServerPort) { goon = false; this.rpcServerPort = rpcServerPort; this.rpcBeanFactory = new RpcBeanFactory(); } /* * 掃描包 */ public void scanPackage(String packageName) { new PackageScanner() {//自己的工具 @Override public void dealClass(Class<?> klass) { if (klass.isAnnotationPresent(RpcInterface.class)) {//掃描到帶有@RpcInterface的註解的類 Class<?>[] interfaces = klass.getInterfaces();//獲取該類所有實現了的介面 for (Class<?> inter : interfaces) { if (inter.equals(Serializable.class)) {//若介面為Serializable,則不註冊 continue; } RpcBeanRegistry.registInterface(inter, klass, rpcBeanFactory);//註冊相關介面 } } } }.packageScan(packageName);; } RpcBeanDefination getRpcBeanDefination(String rpcId) { return rpcBeanFactory.getRpcBeanDefination(rpcId); } /* * 建立伺服器,啟動執行緒 */ public void startRpc() { try { goon = true; server = new ServerSocket(rpcServerPort); new Thread(this, "RpcServer").start(); } catch (IOException e) { e.printStackTrace(); stop(); } } /* *關閉伺服器 */ private void stop() { if (server != null || !server.isClosed()) { try { server.close(); } catch (IOException e) { e.printStackTrace(); } finally { server = null; } } } /* * 執行緒 */ @Override public void run() { while (goon) { try { Socket socket = server.accept(); new RpcServerExcutor(socket, ++count, this);//進行訊息的傳遞 } catch (IOException e) { e.printStackTrace(); goon = false; //若偵聽發生異常,不在繼續偵聽 } } stop(); //在執行完後要關閉伺服器 } }
在這個類中執行緒啟動後,在本類中不進行訊息的傳送,而是在下一個類RpcServerExcutor中完成訊息的互動。因為客戶端傳送資料不是一次傳送完,而是將序列號和方法引數分開發送。使用上述做法是為了讓伺服器偵聽一次客戶端的請求,而客戶端可以傳送多條訊息,也就是為了實現伺服器和客戶端之間的短連線。(之所以要去掉Serializable介面,是因為客戶端傳遞的方法序列號必須是在該類已經實現了這個介面後才有的,但是在註冊時不需要將該介面註冊(實現了介面的類必須要實現Serializable介面,否則因沒有序列號而產生異常)) RpcServerExcutor類 成員:
private Socket socket;//偵聽到的客戶端
private ObjectInputStream ois;//輸入流
private ObjectOutputStream oos;//輸出流
private RpcServer rpcServer;//與RpcServer類之間進行聯絡
方法:
public class RpcServerExcutor implements Runnable {
//構造方法
RpcServerExcutor(Socket socket, long count, RpcServer rpcServer) throws IOException {
this.socket = socket;
this.rpcServer = rpcServer;
this.ois = new ObjectInputStream(this.socket.getInputStream());
this.oos = new ObjectOutputStream(this.socket.getOutputStream());
new Thread(this, "count").start();//啟動執行緒
}
/*
*斷開與客戶端的連線
*/
void closeRpc() {
try {
if (ois != null) {
ois.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
ois = null;
}
try {
if (oos != null) {
oos.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
oos = null;
}
try {
if (socket != null || !socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
socket = null;
}
}
//執行緒
@Override
public void run() {
//獲取客戶端傳送過來的方法,物件,引數;根據這些資料確定方法;獲取方法的返回值;將返回值傳送給客戶
try {
String rpcId = ois.readUTF();//接收到序列號
Object[] parameters = (Object[])ois.readObject();//就收所有引數並構成陣列
RpcBeanDefination rpcBeanDefination = rpcServer.getRpcBeanDefination(rpcId);//根據序列號找到相關方法
Object object = rpcBeanDefination.getObject();//得到要執行方法的物件
Method method = rpcBeanDefination.getMethod();//得到方法
Object result = method.invoke(object, parameters);//通過物件和引數進行反射
oos.writeObject(result);//傳送執行結果
} catch (Exception e) {
e.printStackTrace();
closeRpc();//發生異常則關閉與該客戶端的連線
}
}
}
本類的作用就是將客戶端傳送的資料接收併發送執行後的結果,也就是真正與客戶端資訊互動的地方 在RpcServer類中提到了RpcBeanFactory類和RpcBeanRegistry類 RpcBeanFactory類:
public class RpcBeanFactory {
//其鍵為方法的序列號,值為RpcBeanDefination類(在後面講到)
private final Map<String, RpcBeanDefination> rpcBeanMap;
RpcBeanFactory() {
this.rpcBeanMap = new HashMap<>();
}
/*
*給rpcBeanMap中新增元素
*/
void addRpcBeanMap(String rpcId, RpcBeanDefination rpcBeanDefination) {
RpcBeanDefination rbd = rpcBeanMap.get(rpcId);
if (rbd != null) {//若該序列號的所有方法已經新增
return;
}
this.rpcBeanMap.put(rpcId, rpcBeanDefination);//真正的新增
}
/*
*通過方法序列號獲得RpcBeanDefination類中的所有成員
*/
RpcBeanDefination getRpcBeanDefination(String rpcId) {
return rpcBeanMap.get(rpcId);
}
}
在該類中沒有真正的新增,只是產生了新增的方法 RpcBeanRegistry類:
public class RpcBeanRegistry {
/*
*通過傳遞過來的介面型別、實現介面的類、rpcBeanFactory進行註冊
*/
static void registInterface(Class<?> interfaces, Class<?> klass,
RpcBeanFactory rpcBeanFactory) {
if (!interfaces.isAssignableFrom(klass)) {//若傳遞來的klass沒有實現interfaces介面
return;
}
try {
doRegist(interfaces, klass.newInstance(), rpcBeanFactory);//新增(本類中的方法)
} catch (Exception e) {
e.printStackTrace();
}
}
private static void doRegist(Class<?> interfaces, Object object, RpcBeanFactory rpcBeanFactory) {
Method[] methods = interfaces.getDeclaredMethods();//得到介面中的所有方法
for (Method method : methods) {
RpcBeanDefination rbd = new RpcBeanDefination();
rbd.setKlass(interfaces);
rbd.setMethod(method);
rbd.setObject(object);
String rpcId = String.valueOf(method.toString().hashCode());//產生方法的序列號
rpcBeanFactory.addRpcBeanMap(rpcId, rbd);//將介面中的每一個方法都註冊(儲存到rpcBeanMap中)
}
}
}
在這個類中將掃描的介面、物件、方法進行新增(在這裡新增的序列號是通過method.toString().hashCode()形成的,這樣可以保證序列的唯一性。如果只有method.hashCode()即只要是方法名稱和引數型別和個數一樣他們的序列號就一樣。如果是兩個不同類中含有相同的方法就會出現這種情況) RpcBeandDefination類:
private Class<?> klass;
private Method method;
private Object object;
在這個類中只有介面、方法、實現介面的物件三個成員以及他們的set()和get()方法 客戶端框架如圖 在客戶端有兩個類 我們先來看RpcClentExcutor類 類中成員:
private Socket socket;
private String rpcClientIp;//客戶端的IP
private int rpcServerPort;//伺服器的port
類中方法:
public class RpcClientExcutor {
//從使用者得到IP、port
RpcClientExcutor(String rpcClientIp, int rpcServerPort){
this.rpcClientIp = rpcClientIp;
this.rpcServerPort = rpcServerPort;
}
/*
*與伺服器進行互動
*/
Object rpcExcutor(String rpcId, Object[] paras) throws Exception {
socket = new Socket(rpcClientIp, rpcServerPort); //連線伺服器
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());//輸出流
oos.writeUTF(rpcId);//傳送方法的序列號
oos.writeObject(paras);//傳送方法引數
//輸入流(注:下面這一行程式碼必須要寫在傳送訊號之後,否則會發生阻塞)
ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
Object result = ois.readObject();//讀取伺服器傳送回來的資料
closeRpc(ois, oos, socket);//斷開與伺服器的連線
return result;
}
/*
*關閉客戶端
*/
void closeRpc(ObjectInputStream ois, ObjectOutputStream oos, Socket socket) {
try {
if (ois != null) {
ois.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
ois = null;
}
try {
if (oos != null) {
oos.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
oos = null;
}
try {
if (socket != null || !socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
socket = null;
}
}
}
在這個類中主要進行的方法是將RpcClient產生代理後的方法的引數和序列號進行傳送 RpcClient類:
public class RpcClient {
//與RpcClient進行連線
private RpcClientExcutor rpcClientExcutor;
//與客戶傳送的IP、port連線
public RpcClient(String rpcClientIp, int rpcServerPort) {
rpcClientExcutor = new RpcClientExcutor(rpcClientIp, rpcServerPort);
}
/*
*產生代理
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<?> klass) {
return (T)Proxy.newProxyInstance(klass.getClassLoader(), new Class<?>[]{klass},
new InvocationHandler() {
@Override
public Object invoke(Object arg0, Method method, Object[] args) throws Throwable {
String rpcId = String.valueOf(method.toString().hashCode());//產生方法序列號
Object result = rpcClientExcutor.rpcExcutor(rpcId, args);//呼叫RpcClientExcutor類中的rpcExcutor()方法
return result;//返回執行結果
}
});
}
}
下面我們來測試一下 首先建立一個介面
public interface ITestForRPC {
UserModel getUserById(String id);
}
然後建立一個實現該介面的類,並且帶有@RpcInterface介面(一定要實現Serializable介面,否則會因為產生不了埠號而出現異常)
@RpcInterface
public class UserAction implements ITestForRPC, Serializable {
private static final long serialVersionUID = -7874287356974650711L;
@Override
public UserModel getUserById(String id) {
UserModel user = new UserModel();
user.setId(id);
user.setName("張三");
user.setSex(true);
return user;
}
}
UserModel是一個類,其中有name,id,sex成員 然後啟動伺服器:
public class TestServer {
public static void main(String[] args) {
RpcServer rpcServer = new RpcServer(11520);//設定埠號
rpcServer.scanPackage("com.satellite.rpc.cor");//自己的包路徑
rpcServer.startRpc();
}
}
最後啟動客戶端:
public class TestClient {
public static void main(String[] args) {
RpcClient rpcClient = new RpcClient("192.168.1.206", 11520);//自己電腦的Ip和伺服器的埠號
ITestForRPC proxy = rpcClient.getProxy(ITestForRPC.class);//產生代理
UserModel userModel = proxy.getUserById("03161101");
System.out.println(userModel);//輸出userModel
}
}
執行結果: 這只是簡單的實現,還有很多需要改進的地方,比如將埠號和ip進行檔案配置;新增攔截器等。