用Java實現RPC框架例項
一、RPC簡介
RPC,全稱為Remote Procedure Call,即遠端過程呼叫,它是一個計算機通訊協議。它允許像呼叫本地服務一樣呼叫遠端服務。它可以有不同的實現方式。如RMI(遠端方法呼叫)、Hessian、Http invoker等。另外,RPC是與語言無關的。
RPC示意圖
如上圖所示,假設Computer1在呼叫sayHi()方法,對於Computer1而言呼叫sayHi()方法就像呼叫本地方法一樣,呼叫 –>返回。但從後續呼叫可以看出Computer1呼叫的是Computer2中的sayHi()方法,RPC遮蔽了底層的實現細節,讓呼叫者無需關注網路通訊,資料傳輸等細節。
二、RPC框架的實現
上面介紹了RPC的核心原理:RPC能夠讓本地應用簡單、高效地呼叫伺服器中的過程(服務)。它主要應用在分散式系統。如Hadoop中的IPC元件。但怎樣實現一個RPC框架呢?
從下面幾個方面思考,僅供參考:
1.通訊模型:假設通訊的為A機器與B機器,A與B之間有通訊模型,在Java中一般基於BIO或NIO;。
2.過程(服務)定位:使用給定的通訊方式,與確定IP與埠及方法名稱確定具體的過程或方法;
3.遠端代理物件:本地呼叫的方法(服務)其實是遠端方法的本地代理,因此可能需要一個遠端代理物件,對於Java而言,遠端代理物件可以使用Java的動態物件實現,封裝了呼叫遠端方法呼叫;
4.序列化,將物件名稱、方法名稱、引數等物件資訊進行網路傳輸需要轉換成二進位制傳輸,這裡可能需要不同的序列化技術方案。如:protobuf,Arvo等。
三、Java實現RPC框架
1、實現技術方案
下面使用比較原始的方案實現RPC框架,採用Socket通訊、動態代理與反射與Java原生的序列化。
2、RPC框架架構
RPC架構分為三部分:
1)服務提供者,執行在伺服器端,提供服務介面定義與服務實現類。
2)服務中心,執行在伺服器端,負責將本地服務釋出成遠端服務,管理遠端服務,提供給服務消費者使用。
3)服務消費者,執行在客戶端,通過遠端代理物件呼叫遠端服務。
3、 具體實現
服務提供者介面定義與實現,程式碼如下:
HelloServices介面實現類:
public interface HelloService {
String sayHi(String name);
}
public class HelloServiceImpl implements HelloService {
@Override
public String sayHi(String name) {
return "hi " + name;
}
}
服務中心程式碼實現,程式碼如下:
public interface Server {
void stop();
void start() throws IOException;
void register(Class serviceInterface, Class impl);
boolean isRunning();
int getPort();
}
服務中心實現類:
package com.example.demoms.rpc.server;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServerImpl implements Server {
private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private Map<String, Class> serviceRegistry = new HashMap<>();
private boolean isRunning = false;
private int port;
public ServerImpl(int port) {
this.port = port;
}
public ServerImpl() {
}
@Override
public void stop() {
isRunning = false;
executorService.shutdown();
}
@Override
public void start() throws IOException {
try (ServerSocket serverSocket = new ServerSocket(port)) {
//serverSocket.bind(new InetSocketAddress(port));
System.out.println("server start...");
try {
while (true) {
executorService.execute(new ServerTask(serverSocket.accept()));
}
} finally {
serverSocket.close();
}
}
}
@Override
public void register(Class serviceInterface, Class impl) {
serviceRegistry.put(serviceInterface.getName(), impl);
}
@Override
public boolean isRunning() {
return isRunning;
}
@Override
public int getPort() {
return port;
}
class ServerTask implements Runnable {
private Socket client;
public ServerTask(Socket accept) {
this.client = accept;
}
@Override
public void run() {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(client.getInputStream());
String serviceName = inputStream.readUTF();
String methodName = inputStream.readUTF();
Class<?>[] argsType = ((Class<?>[]) inputStream.readObject());
Object[] args = (Object[]) inputStream.readObject();
Class serviceClass = serviceRegistry.get(serviceName);
if (serviceClass == null) {
throw new ClassNotFoundException("遠端呼叫類不存在:" + serviceName);
}
Method method = serviceClass.getMethod(methodName, argsType);
Object result = method.invoke(serviceClass.newInstance(), args);
outputStream = new ObjectOutputStream(client.getOutputStream());
outputStream.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (outputStream != null) {
outputStream.close();
}
if (inputStream != null) {
inputStream.close();
}
if (client != null) {
client.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客戶端的遠端代理物件:
package com.example.demoms.rpc.consumer;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
public class RPCClient {
public static <T> T getRemoteProxyObj(final Class<?> serviceInterface, final InetSocketAddress address) {
return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class[]{serviceInterface},
(proxy, method, args) -> {
Socket socket = null;
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
socket = new Socket();
socket.connect(address);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeUTF(serviceInterface.getName());
outputStream.writeUTF(method.getName());
outputStream.writeObject(method.getParameterTypes());
outputStream.writeObject(args);
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} finally {
if (outputStream != null) outputStream.close();
if (inputStream != null) inputStream.close();
if (socket != null) socket.close();
}
});
}
}
最後為測試類:
package com.example.demoms.rpc.consumer;
import com.example.demoms.rpc.provider.HelloService;
import com.example.demoms.rpc.provider.HelloServiceImpl;
import com.example.demoms.rpc.server.Server;
import com.example.demoms.rpc.server.ServerImpl;
import java.io.IOException;
import java.net.InetSocketAddress;
public class RPCTest {
public static void main(String[] args) {
new Thread(() -> {
Server server = new ServerImpl(8888);
server.register(HelloService.class, HelloServiceImpl.class);
try {
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
HelloService service = RPCClient.getRemoteProxyObj(HelloService.class, new InetSocketAddress("localhost", 8888));
System.out.println(service.sayHi("hello"));
}
}
執行結果:
server start...
hi hello
四、總結
RPC本質為訊息處理模型,RPC遮蔽了底層不同主機間的通訊細節,讓程序呼叫遠端的服務就像是本地的服務一樣。
五、可以改進的地方
這裡實現的簡單RPC框架是使用Java語言開發,與Java語言高度耦合,並且通訊方式採用的Socket是基於BIO實現的,IO效率不高,還有Java原生的序列化機制佔記憶體太多,執行效率也不高。可以考慮從下面幾種方法改進。
可以採用基於JSON資料傳輸的RPC框架;
可以使用NIO或直接使用Netty替代BIO實現;
使用開源的序列化機制,如Hadoop Avro與Google protobuf等;
服務註冊可以使用Zookeeper進行管理,能夠讓應用更加穩定。