三百行代碼完成一個簡單的rpc框架
阿新 • • 發佈:2018-03-11
rpc dubbo demo 花了半天的時間寫了個簡單的rpc框架,是因為我最初看dubbo源碼的時候發現dubbo雖然看起來很龐大,但是隱隱約約總感覺,其實其絕大多數功能,都是基於可擴張性和服務治理的需要而編寫的。我看過dubbo和grpc的源碼,這兩個都是非常優秀的rpc框架,但是為了讓初學rpc的同學能夠快速抓住rpc的核心,所以寫了這篇文章,希望看過的同學,再次去看dubbo的源碼的時候,能夠抓住這個核心去看。
一:rpc協議的接口
RpcProtocol.java
一:rpc協議的接口
RpcProtocol.java
public interface RpcProtocol { void export(int port); Object refer(Class inrerfaceClass,String host, int port);
}
這個接口類只提供兩個接口,那是因為對於一個rpc框架來說,本質上就只需要兩個接口,一個是consumer引用provider的服務,一個是provider接收到consumer的請求之後對外暴露服務。
下面是具體的實現。代碼不復雜,可以直接復制到idea,慢慢調試
二:rpc協議的具體實現
RpcCore.java
public class RpcCore implements RpcProtocol{ private Socket socket; private ObjectOutputStream objectOutputStream; private ObjectInputStream objectInputStream; private ServerSocket serverSocket; private Map<String,List<Object>> services=new ConcurrentHashMap<String, List<Object>>(); private Map<String,Map<String,Object>> interfaceAtrributes=new ConcurrentHashMap<>(); @Override public void export(int port){ start(port); } @Override public Object refer(final Class interfaceClass,String host, int port){ connect(host,port); return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String interfaceName=interfaceClass.getName(); String fullName= (String) interfaceAtrributes.get(interfaceName).get("fullName"); return get(fullName,method,args); } }); } public Object get(String interfaceFullName,Method method,Object[] parames){ Object result=null; try { objectOutputStream.writeUTF(interfaceFullName); objectOutputStream.writeUTF(method.getName()); objectOutputStream.writeObject(method.getParameterTypes()); objectOutputStream.writeObject(parames); objectOutputStream.flush(); result=objectInputStream.readObject(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } finally { try { if (objectOutputStream!=null) { objectOutputStream.close(); } if (objectInputStream!=null) { objectInputStream.close(); } } catch (IOException e) { e.printStackTrace(); } } return result; } private void start(int port) { try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress("localhost", port)); init(); } catch (IOException e) { e.printStackTrace(); } while (true) { System.out.println("server has started success port is --->"+port); Socket socket = null; try { socket = serverSocket.accept(); new Thread(new Processsor(socket,services)).start(); } catch (IOException e) { e.printStackTrace(); } } } public void init(){ RpcDemo rpcDemo=new RpcDemoImplProvider(); String group="rpcDemo"; String version="1.0.0"; String fullName=RpcDemo.class.getName()+"&"+group+"&"+version; List<Object> rpcDemoInstances=services.get(fullName); if (rpcDemoInstances==null){ rpcDemoInstances=new ArrayList(); rpcDemoInstances.add(rpcDemo); } services.put(fullName,rpcDemoInstances); } public void connect(String host, int port) { try { storeInterface(); socket = new Socket(); socket.connect(new InetSocketAddress(host, port)); objectOutputStream=new ObjectOutputStream(socket.getOutputStream()); objectInputStream=new ObjectInputStream(socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); } } private void storeInterface(){ String group="rpcDemo"; String version="1.0.0"; String fullName=RpcDemo.class.getName()+"&"+group+"&"+version; Map<String,Object> attributes=interfaceAtrributes.get(fullName); if (attributes==null){ attributes=new ConcurrentHashMap(100); attributes.put("group",group); attributes.put("version",version); attributes.put("fullName",fullName); } interfaceAtrributes.put(RpcDemo.class.getName(),attributes); } class Processsor implements Runnable { private Socket socket; private ObjectInputStream objectInputStream; private ObjectOutputStream objectOutputStream; private Map<String,List<Object>> services; private Processsor(Socket socket,Map<String,List<Object>> services) { this.socket = socket; this.services=services; } @Override public void run() { System.out.println((((InetSocketAddress) socket.getRemoteSocketAddress()).getPort())); try { objectInputStream = new ObjectInputStream(socket.getInputStream()); objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); String interfaceFullName=objectInputStream.readUTF(); String methodName=objectInputStream.readUTF(); Class[] parameTypes= (Class[]) objectInputStream.readObject(); Object[] objects= (Object[]) objectInputStream.readObject(); String interfaceName=interfaceFullName.split("&")[0]; Class service=Class.forName(interfaceName); Method method=service.getMethod(methodName,parameTypes); Object instances=services.get(interfaceFullName).get(0); Object result = method.invoke(instances, objects); objectOutputStream.writeObject(result); objectOutputStream.flush(); objectOutputStream.close(); objectInputStream.close(); } catch (Exception e) { e.printStackTrace(); } } } }
三:rpc測試的接口
所謂接口說白了就是協議,與http,mqtt等其他的協議本質上沒什麽區別,只不過rpc是強依賴,而後兩個是弱依賴而已,另外之所以把實體作為內部類,是為了表達一種思想,rpc的實體類和異常都是協議的一部分,應該將他們放到一起。
Rpcdemo.java
public interface RpcDemo { Student getStudent(Integer id,String name); class Student implements Serializable{ public long id; public String name; public int age; public boolean man; public Student(long id, String name, int age, boolean man) { this.id = id; this.name = name; this.age = age; this.man = man; } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public boolean isMan() { return man; } public void setMan(boolean man) { this.man = man; } @Override public String toString() { return "Student{" + "id=" + id + ", name=‘" + name + ‘\‘‘ + ", age=" + age + ", man=" + man + ‘}‘; } }
}
四:接口的實現:
RpcDemoImplProvider.java
public class RpcDemoImplProvider implements RpcDemo{
public Student getStudent(Integer id,String name){
return new Student(1234,"zhangsan",20,true);
}
public static void main(String[] args) {
RpcCore rpcCore=new RpcCore();
rpcCore.export(8087);
}
}
五:RpcDemoConsumer.java
消費端。
public class RpcDemoConsumer {
public static void main(String[] args) {
RpcCore rpcCore=new RpcCore();
RpcDemo rpcDemo = (RpcDemo)rpcCore.refer(RpcDemo.class, "127.0.0.1", 8087);
System.out.println(" 遠程調用成功");
System.out.println("返回的結果是---->"+rpcDemo.getStudent(111,"zhangsan"));
}
}
六:先啟動RpcDemoImplProvider
七:啟動RpcDemoConsumer
總共也就250行代碼左右。over
三百行代碼完成一個簡單的rpc框架