1. 程式人生 > 實用技巧 >就靠這幾段程式碼,帶你玩轉rpc通訊協議,不信你學不明白

就靠這幾段程式碼,帶你玩轉rpc通訊協議,不信你學不明白

RPC(Remote Procedure Call Protocol)——遠端過程呼叫協議,它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路技術的協議。該協議允許運行於一臺計算機的程式呼叫另一臺計算機的子程式,而程式設計師無需額外地為這個互動作用程式設計。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通訊程式之間攜帶資訊資料。在OSI網路通訊模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網路分散式多程式在內的應用程式更加容易。

2.基本概念

  • RPC(Remote Procedure Call)遠端過程呼叫,簡單的理解是一個節點請求另一個節點提供的服務
  • 本地過程呼叫:如果需要將本地student物件的age+1,可以實現一個addAge()方法,將student物件傳入,對年齡進行更新之後返回即可,本地方法呼叫的函式體通過函式指標來指定。
  • 遠端過程呼叫:上述操作的過程中,如果addAge()這個方法在服務端,執行函式的函式體在遠端機器上,如何告訴機器需要呼叫這個方法呢?

今天,我們就通過一個例項程式碼進行演示,一步步的檢視,rpc的通訊是如何進行的,有興趣的朋友可以把程式碼進行實現,自己debug一下,檢視每一步的傳參

1

package com.mashibing.rpc.common;

import java.io.Serializable;
​
public class User implements Serializable {
  private static final long serialVersionUID = 1L;
​
  private Integer id;
  private String name;
​
  public User(Integer id, String name) {
    this.id = id;
    this.name = name;
   }
​
  public Integer getId() {
    return id;
   }
​
  public String getName() {
    return name;
   }
​
  public void setId(Integer id) {
    this.id = id;
   }
​
  public void setName(String name) {
    this.name = name;
   }
​
  @Override
  public String toString() {
    return "User{" +
        "id=" + id +
        ", name='" + name + '\'' +
        '}';
   }
}
​
package com.mashibing.rpc.common;
​
public interface IUserService {
  public User findUserById(Integer id);
}
package com.mashibing.rpc01;
​
​
import com.mashibing.rpc.common.IUserService;
import com.mashibing.rpc.common.User;
​
public class UserServiceImpl implements IUserService {
  @Override
  public User findUserById(Integer id) {
    return new User(id, "Alice");
   }
}
​
package com.mashibing.rpc01;
​
import com.mashibing.rpc.common.User;
​
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
​
public class Client {
  public static void main(String[] args) throws Exception {
    Socket s = new Socket("127.0.0.1", 8888);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(baos);
    dos.writeInt(123);
​
    s.getOutputStream().write(baos.toByteArray());
    s.getOutputStream().flush();
​
    DataInputStream dis = new DataInputStream(s.getInputStream());
    int id = dis.readInt();
    String name = dis.readUTF();
    User user = new User(id, name);
​
    System.out.println(user);
​
    dos.close();
    s.close();
   }
}
​

2

package com.mashibing.rpc02;
​
import com.mashibing.rpc.common.User;
​
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.net.Socket;
​
public class Stub {
  public User findUserById(Integer id) throws Exception {
    Socket s = new Socket("127.0.0.1", 8888);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(baos);
    dos.writeInt(123);
​
    s.getOutputStream().write(baos.toByteArray());
    s.getOutputStream().flush();
​
    DataInputStream dis = new DataInputStream(s.getInputStream());
    int receivedId = dis.readInt();
    String name = dis.readUTF();
    User user = new User(id, name);
​
    dos.close();
    s.close();
    return user;
   }
}
​

3

package com.mashibing.rpc03;
​
import com.mashibing.rpc.common.IUserService;
import com.mashibing.rpc.common.User;
​
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
​
/**
 * 而且Client的呼叫顯得不是很合理(Stub裡只有findById的程式碼),如果有個findByName的新方法,那麼就又得重新改進
 * 下面這種寫法解決了方法增加的問題
 */
​
public class Stub {
  public static IUserService getStub() {
    InvocationHandler h = new InvocationHandler() {
      @Override
      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Socket s = new Socket("127.0.0.1", 8888);
​
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        dos.writeInt(123);
​
        s.getOutputStream().write(baos.toByteArray());
        s.getOutputStream().flush();
​
        DataInputStream dis = new DataInputStream(s.getInputStream());
        int id = dis.readInt();
        String name = dis.readUTF();
        User user = new User(id, name);
​
        dos.close();
        s.close();
        return user;
       }
     };
    Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[] {IUserService.class}, h);
    return (IUserService)o;
   }
}
​

4

package com.mashibing.rpc04;
​
import com.mashibing.rpc.common.IUserService;
import com.mashibing.rpc.common.User;
​
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
​
/**
 * 但是這裡僅僅實現了findByUserId的方法代理,如果要實現其他方法的代理該怎麼做呢?
 * 這裡就要從協議層做出改進
 *
 * 伺服器端也要做出對應處理
 */
​
public class Stub {
  public static IUserService getStub() {
    InvocationHandler h = new InvocationHandler() {
      @Override
      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Socket s = new Socket("127.0.0.1", 8888);
​
        ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream());
​
        String methodName = method.getName();
        Class[] parametersTypes = method.getParameterTypes();
        oos.writeUTF(methodName);
        oos.writeObject(parametersTypes);
        oos.writeObject(args);
        oos.flush();
​
​
        DataInputStream dis = new DataInputStream(s.getInputStream());
        int id = dis.readInt();
        String name = dis.readUTF();
        User user = new User(id, name);
​
        oos.close();
        s.close();
        return user;
       }
     };
    Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[] {IUserService.class}, h);
    return (IUserService)o;
   }
​
​
​
​
}
​
package com.mashibing.rpc04;
​
import com.mashibing.rpc.common.IUserService;
import com.mashibing.rpc.common.User;
​
import java.io.*;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
​
public class Server {
  private static boolean running = true;
  public static void main(String[] args) throws Exception {
    ServerSocket ss = new ServerSocket(8888);
    while (running) {
      Socket s = ss.accept();
      process(s);
      s.close();
     }
    ss.close();
   }
​
  private static void process(Socket s) throws Exception {
    InputStream in = s.getInputStream();
    OutputStream out = s.getOutputStream();
    ObjectInputStream oos = new ObjectInputStream(in);
    DataOutputStream dos = new DataOutputStream(out);
​
    String methodName = oos.readUTF();
    Class[] parameterTypes = (Class[])oos.readObject();
    Object[] args = (Object[])oos.readObject();
​
    IUserService service = new UserServiceImpl();
    Method method = service.getClass().getMethod(methodName, parameterTypes);
    User user = (User)method.invoke(service, args);
​
​
​
    dos.writeInt(user.getId());
    dos.writeUTF(user.getName());
    dos.flush();
   }
}
​

5

返回值用Object封裝,支援任意型別


就這樣,直到最後的server端獲取到資料,完成了一個數據傳輸,不知道大家能不能理解,最後,紙上得來終覺淺,我建議大家還是能夠可以自己實現一下。

關注我,一個思維跳脫並且有點灑脫的程式設計師,謝謝

最近在整理我這幾年工作以及生活中對我幫助較大的視訊以及資料,有需要的,可以去我git倉庫中自取

連結在這裡:https://gitee.com/biwangsheng/personal.git