1. 程式人生 > 其它 >網路程式設計3

網路程式設計3

技術標籤:網路程式設計網路

網路通訊基本常識

  • 程式設計中的Socket是什麼?

    • Socket是應用層與TCP/IP協議族通訊的間軟體抽象層,它是一組介面,其實就是一個門面模式。TCP用主機的IP地址加上主機上的埠號作為TCP連線的端點,這種端點就叫做套接字(socket)

    • socket是一種門面模式,遮蔽了與機器的交流過程,方便業務程式設計師開發程式碼

    • A的socket ---- B的socket

      兩個socket建立關係,從而A和B就可以通訊了

  • 短連線

    • 一次通訊完了馬上就關閉連線

    • 伺服器要服務成千上億的服務

    • 短連線和長連線到底如何選擇,需要根據實際情況

  • 長連線

    • 一次連線內會進行多次通訊

    • 維持住長連線主要消耗的是記憶體和FD檔案描述符

    • 如果伺服器效能好,可以維持上百萬的長連線

    • 操作頻繁、或者點對點的通訊上,連線池來維護這種長連線

  • InetAddress類 --> 只表示地址(主機)

    • InetAddress address = InetAddress.getByName(“www.baidu.com”)

    • 可以獲得百度的ip地址

    • 怎麼獲得的?-- dns解析

    • InetAddress address2 = InetAddress.getByName(“124.232.170.22”)

      System.out.println(address2.getHostName())

      同樣可以根據ip地址獲取域名,如果找不到,就會列印源ip地址

    • InetAddress[] allIp= InetAddress.getAllByName(“www.baidu.com”)

      獲取百度域名下面所有的ip地址

    • 把一個ip地址包裝成InetAddress物件

      byte[] bytes = {(byte)192,(byte)168,56,1};

      InetAddress address = InetAddress.getByAddress(bytes);

    InetSocketAddress類 --> 主機名+埠

    NetoworkInterface類

    • 開啟裝置管理器-網路介面卡

      NetoworkInterface會把這些全部找出來,除了這些,還有別的

      // 127.0.0.1-------本地迴環介面

      InetAddress address = InetAddress.getByName(“127.0.0.1”);

      NetworkInterface byInetAddress = NetoworkInterface.getByInetAddress(address);

      // 獲取所有的網路通訊介面類

      // 包括本地迴環、網路介面卡、wifi、廣域網、硬體裝置通訊

      NetoworkInterface.getNetoworkInterfaces();

  • 服務端、客戶端、通訊程式設計關注的三件事

    • 所有網路通訊一定有服務端和客戶端這兩樣

    • 提供服務的稱為服務端

    • 連線服務的稱為客戶端

    • 某個類有Server、ServerSocket,那麼這個類往往是給服務端使用的

      ServerSocket只是個容器,容納網路服務用的

    • 某個類只有socket,一般是負責具體的網路讀寫

      真實網路通訊中,真正進行網路讀寫的都是socket

  • 網路程式設計中一定要關注的點

    • 網路連線
    • 讀網路資料
    • 寫網路資料
  • jdk網路通訊

    • bio

    • nio

    • aio—非主流

原生JDK網路程式設計-BIO

  • BIO:block ,即阻塞式io

  • ServerSocket負責繫結IP地址,啟動監聽埠;

  • Socket負責發起連線操作。連線成功後,雙方通過輸入和輸出流進行同步阻塞式通訊

  • ServerSocket啟動了處於監聽狀態
    
    通過accept()方法來響應請求的socket()
    
    新生成一個執行緒,線上程中生成一個socket()來與請求的socket()通訊
    

伺服器端

  • package WangLuoBianCheng.wangLuoBianCheng3;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class Server{
    
        public static void main(String[] args)throws IOException {
    
            ServerSocket server = new ServerSocket();
            server.bind(new InetSocketAddress(10001));
            System.out.println("Server is started...");
            while(true){
                // 來一個socket請求,就會new一個執行緒,同時new 一個socket
                // 執行server.accept()成功,如果三次握手完成會返回一個Socket
                // 更優的寫法是用執行緒池,實現複用
                new Thread(new ServerTask(server.accept())).start();
    
            }
    
        }
    
    
        private static class ServerTask implements Runnable{
    
            private Socket socket = null;
    
            public ServerTask(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run(){
                try (ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())){
    
                    String userName = input.readUTF();
                    System.out.println("Accept client Message:"+userName);
    
                    output.writeUTF("Hello,"+userName);
                    // 上一句只是寫入快取,準備傳送
                    // 強制刷出
                    output.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
    }
    
    

客戶端

  • package WangLuoBianCheng.wangLuoBianCheng3;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    
    public class Client {
    
        public static void main(String[] args) throws IOException {
            // 用來通訊的socket
            Socket socket = null;
    
            // 注意客戶端必須是輸出在上,輸入在下,確保流通道建立起來
            // 因為如果兩端都是輸入,怎麼傳遞?
            // 或者兩端都是先建立輸出流,再建立輸入流,未測試?
            ObjectOutputStream objectOutputStream = null;
            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 10001);
            ObjectInputStream objectInputStream = null;
    
            try {
                socket = new Socket();
                // 建立連線
                socket.connect(inetSocketAddress);
    
                objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                objectInputStream = new ObjectInputStream(socket.getInputStream());
    
                // 發出訊息
                objectOutputStream.writeUTF("James");
                objectOutputStream.flush();
    
                // 接受伺服器響應的資訊並列印
                // 執行完後關閉了網路連線
                System.out.println(objectInputStream.readUTF());
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (socket!=null) socket.close();
    
                if (objectOutputStream!=null) objectOutputStream.close();
    
                if (objectInputStream!=null) objectInputStream.close();
            }
        }
    }
    
    

改進—偽非同步IO模型

  • 上述伺服器端是來一個socket請求,就會new一個執行緒,同時new 一個socket

  • 為了實現執行緒複用,應該採用執行緒池的執行緒來處理任務,而這種模式又稱為偽非同步IO模型

  • package WangLuoBianCheng.wangLuoBianCheng3;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ServerPool {
    
        private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
    
    
        public static void main(String[] args)throws IOException {
    
            ServerSocket server = new ServerSocket();
            server.bind(new InetSocketAddress(10001));
            System.out.println("Server is started...");
            while(true){
                // 執行server.accept()成功,如果三次握手完成會返回一個Socket
                executorService.execute(new ServerTask(server.accept()));
            }
    
        }
    
    
        private static class ServerTask implements Runnable{
    
            private Socket socket = null;
    
            public ServerTask(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run(){
                try (ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())){
    
                    String userName = input.readUTF();
                    System.out.println("Accept client Message:"+userName);
    
                    output.writeUTF("Hello,"+userName);
                    // 強制刷出
                    output.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }
    
    

BIO應用-RPC框架

  • 單WEB專案

    • 1.訂單服務
    • 2.扣減庫存服務
    • 3.傳送簡訊提示服務
  • 之前上述三個服務全是單執行緒執行,當呼叫服務的使用者數很大時,就不能充分發揮併發程式設計的高效,因此考慮將三個服務拆分,都分別由單獨的執行緒來執行

  • 進一步把不同的服務部署到不同的伺服器上,而且每個服務是通過伺服器叢集的形式來部署,從而形成了分散式的架構

    • 這時不同服務之間的呼叫就需要引入rpc,因為是跨伺服器跨網路的呼叫,不再是單機內部的方法呼叫

什麼是RPC?

  • RPC(Remote Procedure Call ——遠端過程呼叫),它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路的技術。–來自《百度百科》

客戶端存根–server stub

  • 扣減庫存服務,為了使呼叫遠端服務和呼叫本地服務沒有什麼區別,就需要引入客戶端存根

  • 客戶端存根就是遠端方法在本地的模擬,包括方法名、方法引數

服務端存根–client stub

  • 負責把客戶端請求發來的請求體,解析成跟伺服器實際方法相同的方法名、方法引數

總結

  • rpc方法就是把上述過程全部包裝起來,使得呼叫遠端方法就跟呼叫本地方法一樣

RPC和HTTP

  • TCP/IP概念層模型功能TCP/IP協議族
    檔案傳輸、電子郵件、檔案服務、虛擬終端TFTP、HTTP、SNMP、FTP、SMTP、DNS、Telnet
    應用層資料格式化、程式碼轉換、資料加密
    解除或建立與別的接點的聯絡
    傳輸層提供端與端的介面TCPUDP
    網路層為資料包選擇路由IP、ICMP、RIP、OSPF、BGP、IGMP
    鏈路層傳輸有地址的幀以及錯誤檢測功能SLIP、CSLIP、PPP、ARP、RARP、MTU
    以二進位制資料形式在物理媒體上傳輸資料ISO2110、IEEE802、IEEE802.2

rpc究竟在上面協議分層的哪一層?

  • rpc只是一種思想,對不同服務呼叫的一種描述,既可以通過http實現,也可以通過tcp、htp,所以rpc和http不是一個層級的東西

實現RPC框架需要解決的那些問題?

  • 通訊問題

    • 不要每次方法呼叫,都需要建立socket連線
  • 代理問題

    • 可以不可以通過代理的方式,每次的通訊都通過指定的代理來解決

    • 代理問題的解決

      代理模式,用動態代理,為什麼不用靜態代理?-- 針對每一個服務都要建立相應的代理類,而rpc框架不知道要建立哪些靜態代理,所以使用動態代理,不管你要呼叫什麼服務,通過動態代理一把解決

  • 序列化問題

    • 訊息在網路中傳輸是位元組的形式,怎麼把01位元組變成javaBean的形式

    • 序列化問題的解決

      實現Serializable介面,但是jdk的序列化效能很差

      測試結果1-----jdk的位元組碼長度:133 自己位元組碼長度:24

      測試結果2-----jdk序列化耗時:1364ms 自己序列化耗時:108ms

  • 服務例項化

    • 方法呼叫時只有一個方法名和方法引數,怎麼轉化成呼叫具體的服務

    • 服務例項化的解決

      通過反射解決

序列化耗時測試

測試1

  • package WangLuoBianCheng.wangLuoBianCheng3.rpc.prepare.serial.protogenesis;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    
    /**
     * @author Mark老師   享學課堂 https://enjoy.ke.qq.com
     * 類說明:測試序列化後位元組大小
     */
    public class TestUserInfo {
    
        /**
         * @param args
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
    		UserInfo info = new UserInfo();
    		info.buildUserID(100).buildUserName("Welcome to Netty");
    		//使用jdk的序列化
    		ByteArrayOutputStream bos = new ByteArrayOutputStream();
    		ObjectOutputStream os = new ObjectOutputStream(bos);
    		os.writeObject(info);
    		os.flush();
    		os.close();
    		byte[] b = bos.toByteArray();
    		System.out.println("The jdk serializable length is : " + b.length);
    		bos.close();
    		//使用自行的序列化
    		System.out.println("-------------------------------------");
    		System.out.println("The byte array serializable length is : "
    			+ info.codeC().length);
        }
    
    }
    
    

測試2

  • package WangLuoBianCheng.wangLuoBianCheng3.rpc.prepare.serial.protogenesis;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    import java.nio.ByteBuffer;
    
    /**
     * @author Mark老師   享學課堂 https://enjoy.ke.qq.com
     * 類說明:測試序列化效能差異
     */
    public class PerformTestUserInfo {
    
        public static void main(String[] args) throws IOException {
            UserInfo info = new UserInfo();
            info.buildUserID(100).buildUserName("Welcome to Netty");
            int loop = 1000000;
    
            //使用jdk的序列化
            ByteArrayOutputStream bos = null;
            ObjectOutputStream os = null;
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < loop; i++) {
                bos = new ByteArrayOutputStream();
                os = new ObjectOutputStream(bos);
                os.writeObject(info);
                os.flush();
                os.close();
                byte[] b = bos.toByteArray();
                bos.close();
            }
            long endTime = System.currentTimeMillis();
            System.out.println("The jdk serializable cost time is  : "
                + (endTime - startTime) + " ms");
    
            //使用自行的序列化
            System.out.println("-------------------------------------");
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            startTime = System.currentTimeMillis();
            for (int i = 0; i < loop; i++) {
                byte[] b = info.codeC(buffer);
            }
            endTime = System.currentTimeMillis();
            System.out.println("The byte array serializable cost time is : "
                + (endTime - startTime) + " ms");
    
            }
    
    }
    
    

實現rpc框架

  • 1.服務端定義介面和服務實現類並且註冊服務
  • 2.客戶端使用動態代理呼叫服務(動態代理)
  • 3.客戶端代理把呼叫物件、方法、引數序列化成資料
  • 4.客戶端代理與服務端通過Socket通訊傳輸資料
  • 5.服務端反序列化資料成物件、方法、引數。
  • 6.服務端代理拿到這些物件和引數後通過反射的機制呼叫服務的例項。

將簡訊服務拆分成rpc服務—服務端

  • 服務實體類,服務需要的相關引數型別 -----> 這些可以看出服務端存根

所有服務使用的服務框架----RpcServerFrame

  • package cn.enjoyedu.rpc.rpc.base;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.Method;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     *@author Mark老師  
     *
     *類說明:rpc框架的服務端部分
     */
    @Service
    public class RpcServerFrame {
    
    //    @Autowired
    //    private RegisterService registerService;
        @Autowired
        private RegisterServiceWithRegCenter registerServiceWithRegCenter;
    
        //服務的埠號
        private int port;
    
        /*處理服務請求任務*/
        private static class ServerTask implements Runnable{
    
            private Socket socket;
            private RegisterServiceWithRegCenter registerServiceWithRegCenter;
    
            public ServerTask(Socket client,
                              RegisterServiceWithRegCenter registerServiceWithRegCenter) {
                this.socket = client;
                this.registerServiceWithRegCenter = registerServiceWithRegCenter;
            }
    		
            // 接受客戶端的請求,並呼叫實際的方法
            // 接受的內容---1.方法所在的類名介面名 2.呼叫的方法名  3.方法引數及方法屬性、具體的引數值
            // 
            @Override
            public void run() {
                try(
                     ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                      ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){
    
                    /*方法所在類名介面名*/
                    // 讀字串inputStream.readUTF()
                    String serviceName = inputStream.readUTF();
                    /*方法的名字*/
                    String methodName = inputStream.readUTF();
                    /*方法的入參型別*/
                    // 讀物件inputStream.readObject()
                    // 強制轉型,轉成類
                    Class<?>[] paramTypes = (Class<?>[]) inputStream.readObject();
                    /*方法的入參的值*/
                    Object[] args = (Object[]) inputStream.readObject();
    
                    /*從容器中拿到服務的Class物件*/
                    Class serviceClass = registerServiceWithRegCenter.getLocalService(serviceName);
                    if(serviceClass == null){
                        throw new ClassNotFoundException(serviceName+ " not found");
                    }
    
                    /*通過反射,執行實際的服務*/
                    // 通過反射獲取具體的方法Method物件
                    // 入參包括兩個,方法名和引數型別陣列
                    Method method = serviceClass.getMethod(methodName, paramTypes);
                    // 通過動態代理呼叫響應的方法
                    Object result  = method.invoke(serviceClass.newInstance(),args);
    
                    /*將服務的執行結果通知呼叫者*/
                    // 將呼叫結果輸出給呼叫者,即客戶端
                    outputStream.writeObject(result);
                    outputStream.flush();
    
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        public void startService(String serviceName, String host, int port, Class impl) throws Throwable{
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(port));
            System.out.println("RPC server on:"+port+":執行");
           // registerService.regRemote(ServiceName, impl);
            registerServiceWithRegCenter.regRemote(serviceName,host,port,impl);
            try{
                while(true){
                    new Thread(new ServerTask(serverSocket.accept(),
                            registerServiceWithRegCenter)).start();
                }
            }finally {
                serverSocket.close();
            }
        }
    
    }
    
    
    

服務註冊中心

  • package cn.enjoyedu.rpc.rpc.base;
    
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 類說明:註冊服務到本地快取
     */
    @Service
    public class RegisterService {
    
        /*本地可以提供服務的一個容器*/
        private static final Map<String,Class> serviceCache = new ConcurrentHashMap<>();
    
        /*註冊本服務*/
        public void regService(String serviceName,Class impl){
            serviceCache.put(serviceName,impl);
        }
    
        /*獲取服務*/
        public Class getLocalService(String serviceName){
            return serviceCache.get(serviceName);
        }
    
    
    }
    
    

簡訊服務啟動

SmsRpcServer

  • package cn.enjoyedu.rpc.rpc.sms;
    
    import cn.enjoyedu.rpc.rpc.base.RpcServerFrame;
    import cn.enjoyedu.rpc.remote.SendSms;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.util.Random;
    
    /**
     * 類說明:rpc的服務端,提供服務
     */
    @Service
    public class SmsRpcServer {
    
        // rpc的base框架部分注入
        @Autowired
        private RpcServerFrame rpcServerFrame;
    
        @PostConstruct
        public void server() throws Throwable {
            Random r = new Random();
            // 埠號
            int port = 8778+r.nextInt(100);
            // 啟動服務
            rpcServerFrame.startService(SendSms.class.getName(),
                    "127.0.0.1",port,SendSmsImpl.class);
    
        }
    
    }
    
    

SendSmsImpl—具體的服務實現類

  • package cn.enjoyedu.rpc.rpc.sms;
    
    
    import cn.enjoyedu.rpc.remote.vo.UserInfo;
    import cn.enjoyedu.rpc.remote.SendSms;
    
    /**
     *@author Mark老師   享學課堂 https://enjoy.ke.qq.com 
     *
     *類說明:簡訊息傳送服務的實現
     */
    public class SendSmsImpl implements SendSms {
    
        @Override
        public boolean sendMail(UserInfo user) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("已傳送簡訊息給:"+user.getName()+"到【"+user.getPhone()+"】");
            return true;
        }
    }
    
    

將簡訊服務拆分成rpc服務—客戶端

  • 類似於伺服器端存根,也需要有客戶端存根

客戶端框架類

  • package cn.enjoyedu.rpc.client.rpc;
    
    import cn.enjoyedu.rpc.remote.vo.RegisterServiceVo;
    import org.springframework.stereotype.Service;
    
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.Set;
    
    /**
     *@author Mark老師   
     *類說明:rpc框架的客戶端代理部分
     */
    @Service
    public class RpcClientFrame {
    
        /*遠端服務的代理物件,引數為客戶端要呼叫的的服務*/
        // 傳遞的應該是方法所在的類名或介面名
        public static<T> T getRemoteProxyObject(final Class<?> serviceInterface) throws Exception {
            /*獲得遠端服務的一個網路地址*/
            InetSocketAddress addr = //new InetSocketAddress("127.0.0.1",8778);
               getService(serviceInterface.getName());
    
            /*拿到一個代理物件,由這個代理物件通過網路進行實際的服務呼叫*/
            // 應該產生一個代理物件,和伺服器進行通訊
            // 需要實現了InvocationHandler的類DynProxy
            return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(),
                    new Class<?>[]{serviceInterface},
                    new DynProxy(serviceInterface,addr));
        }
    
    
        /*動態代理,實現對遠端服務的訪問*/
        // 連線伺服器,傳送介面名、方法名、方法引數
        private static class DynProxy implements InvocationHandler{
            private Class<?> serviceInterface;
            private InetSocketAddress addr;
    
            public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) {
                this.serviceInterface = serviceInterface;
                this.addr = addr;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args)
                    throws Throwable {
                Socket socket = null;
                ObjectInputStream inputStream = null;
                ObjectOutputStream outputStream = null;
                try{
                    
                    // 1.建立網路連線
                    socket = new Socket();
                    socket.connect(addr);
                    outputStream = new ObjectOutputStream(socket.getOutputStream());
    
                    // 2.傳送介面名、方法名、引數型別、引數值
                    //方法所在類名介面名
                    outputStream.writeUTF(serviceInterface.getName());
                    //方法的名字
                    outputStream.writeUTF(method.getName());
                    //方法的入參型別
                    outputStream.writeObject(method.getParameterTypes());
                    //方法入參的值
                    outputStream.writeObject(args);
    
                    outputStream.flush();
    
                    // 3.接受伺服器傳回的結果
                    inputStream = new ObjectInputStream(socket.getInputStream());
                    /*接受伺服器的輸出*/
                    System.out.println(serviceInterface+" remote exec success!");
                    return inputStream.readObject();
    
                }finally {
                    if(socket!=null) socket.close();
                    if(outputStream!=null) outputStream.close();
                    if(inputStream!=null) inputStream.close();
    
                }
            }
        }
    
    
    
        /*----------------以下和動態獲得服務提供者有關------------------------------*/
    
        private static Random r = new Random();
    
        /*獲得遠端服務的地址*/
        private static InetSocketAddress getService(String serviceName)
                throws Exception {
            //獲得服務提供者的地址列表
            List<InetSocketAddress> serviceVoList = getServiceList(serviceName);
            InetSocketAddress addr
                    = serviceVoList.get(r.nextInt(serviceVoList.size()));
            System.out.println("本次選擇了伺服器:"+addr);
            return addr;
        }
    
        /*獲得服務提供者的地址*/
        private static List<InetSocketAddress> getServiceList(String serviceName)
                throws Exception {
            Socket socket = null;
            ObjectOutputStream output = null;
            ObjectInputStream input = null;
    
            try{
                socket = new Socket();
                socket.connect(new InetSocketAddress("127.0.0.1",9999));
    
                output = new ObjectOutputStream(socket.getOutputStream());
                //需要獲得服務提供者
                output.writeBoolean(true);
                //告訴註冊中心服務名
                output.writeUTF(serviceName);
                output.flush();
    
                input = new ObjectInputStream(socket.getInputStream());
                Set<RegisterServiceVo> result
                        = (Set<RegisterServiceVo>)input.readObject();
                List<InetSocketAddress> services = new ArrayList<>();
                for(RegisterServiceVo serviceVo : result){
                    String host = serviceVo.getHost();//獲得服務提供者的IP
                    int port = serviceVo.getPort();//獲得服務提供者的埠號
                    InetSocketAddress serviceAddr = new InetSocketAddress(host,port);
                    services.add(serviceAddr);
                }
                System.out.println("獲得服務["+serviceName
                        +"]提供者的地址列表["+services+"],準備呼叫.");
                return services;
            }finally{
                if (socket!=null) socket.close();
                if (output!=null) output.close();
                if (input!=null) input.close();
            }
    
        }
    
    }
    
    

遠端的服務怎麼通過bean接管

  • package cn.enjoyedu.rpc.client.config;
    
    import cn.enjoyedu.rpc.client.rpc.RpcClientFrame;
    import cn.enjoyedu.rpc.remote.SendSms;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 類說明:
     */
    @Configuration
    public class BeanConfig {
    
        @Autowired
        private RpcClientFrame rpcClientFrame;
    
        // 獲取服務端的方法名
        @Bean
        public SendSms getSmsService() throws Exception{
            return rpcClientFrame.getRemoteProxyObject(SendSms.class);
        }
    }
    
    

rpc測試

啟動服務端

  • package cn.enjoyedu.rpc;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class RpcServerSmsApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RpcServerSmsApplication.class, args);
        }
    
    }
    
    

客戶端測試方法

  • package cn.enjoyedu.rpc.client;
    
    import cn.enjoyedu.rpc.client.service.NormalBusi;
    import cn.enjoyedu.rpc.remote.SendSms;
    import cn.enjoyedu.rpc.remote.StockService;
    import cn.enjoyedu.rpc.remote.vo.UserInfo;
    import org.junit.jupiter.api.Test;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RpcClientApplicationTests {
    
        @Autowired
        private NormalBusi normalBusi;
    //    @Autowired
    //    private StockService stockService;
        @Autowired
        private SendSms sendSms;
    
        @Test
        void rpcTest() {
            long start = System.currentTimeMillis();
            normalBusi.business();
    
            /*傳送郵件*/
            UserInfo userInfo = new UserInfo("Mark","[email protected]");
            System.out.println("Send mail: "+ sendSms.sendMail(userInfo));
            System.out.println("共耗時:"+(System.currentTimeMillis()-start)+"ms");
    
    
            /*扣減庫存*/
    //        stockService.addStock("A001",1000);
    //        stockService.deduceStock("B002",50);
        }
    
    }
    
    

測試總結

  • 現在在客戶端只有SendSms這樣一個介面,並沒有實現類
  • 把代理類作為實現類注入到spring的容器裡面
  • 代理類裡面,把對方法的呼叫通過網路轉給了遠端簡訊傳送伺服器上
  • 而遠端簡訊傳送伺服器上,通過一個map容器儲存了改伺服器上能提供的服務
  • 在伺服器端啟動rpc服務時,把具體的實現放到hashmap中,並且啟動了一個網路相關的服務,專門接受客戶端傳遞過來的方法呼叫請求
  • 從而實現了一次rpc的遠端方法呼叫

註冊中心

  • dubbo等rpc框架存在一個註冊中心,而現在的rpc呼叫只有一個,如果要實現叢集化,如果要提供多個rpc服務,難道客戶端呼叫的時候要寫多個伺服器地址進去?

  • 註冊中心本質也是提供rpc服務,只有兩種,提供服務的註冊,提供服務的查詢

RegisterCenter

  • package cn.enjoyedu.rpc.rpc.reg.service;
    
    import cn.enjoyedu.rpc.remote.vo.RegisterServiceVo;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    /**
     * @author Mark老師  
     * 類說明:服務註冊中心,服務提供者在啟動時需要在註冊中心登記自己的資訊
     */
    @Service
    public class RegisterCenter {
        /*key表示服務名,value代表服務提供者地址的集合*/
        private static final Map<String,Set<RegisterServiceVo>> serviceHolder
                = new HashMap<>();
    
        /*註冊服務的埠號*/
        private int port;
    
        /*服務註冊,考慮到可能有多個提供者同時註冊,進行加鎖*/
        private static synchronized void registerService(String serviceName,
                                    String host,int port){
            //獲得當前服務的已有地址集合
            Set<RegisterServiceVo> serviceVoSet = serviceHolder.get(serviceName);
            if(serviceVoSet==null){
                //已有地址集合為空,新增集合
                serviceVoSet = new HashSet<>();
                serviceHolder.put(serviceName,serviceVoSet);
            }
            //將新的服務提供者加入集合
            serviceVoSet.add(new RegisterServiceVo(host,port));
            System.out.println("服務已註冊["+serviceName+"]," +
                    "地址["+host+"],埠["+port+"]");
        }
    
        /*取出服務提供者*/
        private static Set<RegisterServiceVo> getService(String serviceName){
            return serviceHolder.get(serviceName);
        }
    
        /*處理服務請求的任務,其實無非就是兩種服務:
        1、服務註冊服務
        2、服務查詢服務
        */
        private static class ServerTask implements Runnable{
            private Socket client = null;
    
            public ServerTask(Socket client){
                this.client = client;
            }
    
            public void run() {
    
                try(ObjectInputStream inputStream =
                            new ObjectInputStream(client.getInputStream());
                    ObjectOutputStream outputStream =
                            new ObjectOutputStream(client.getOutputStream())){
    
                    /*檢查當前請求是註冊服務還是獲得服務*/
                    boolean isGetService = inputStream.readBoolean();
                    /*服務查詢服務,獲得服務提供者*/
                    if(isGetService){
                        String serviceName = inputStream.readUTF();
                        /*取出服務提供者集合*/
                        Set<RegisterServiceVo> result = getService(serviceName);
                        /*返回給客戶端*/
                        outputStream.writeObject(result);
                        outputStream.flush();
                        System.out.println("將已註冊的服務["+serviceName+"提供給客戶端");
                    }
                    /*服務註冊服務*/
                    else{
                        /*取得新服務提供方的ip和埠*/
                        String serviceName = inputStream.readUTF();
                        String host = inputStream.readUTF();
                        int port = inputStream.readInt();
                        /*在註冊中心儲存*/
                        registerService(serviceName,host,port);
                        outputStream.writeBoolean(true);
                        outputStream.flush();
                    }
                }catch(Exception e){
                    e.printStackTrace();
                }finally {
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /*啟動註冊服務*/
        public void startService() throws IOException {
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(port));
            System.out.println("服務註冊中心 on:"+port+":執行");
            try{
                while(true){
                    new Thread(new ServerTask(serverSocket.accept())).start();
                }
            }finally {
                serverSocket.close();
            }
        }
    
        // 服務註冊中心埠是9999
        @PostConstruct
        public void init() {
            this.port = 9999;
            new Thread(new Runnable() {
                public void run() {
                    try{
                        startService();
                    }catch(IOException e){
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    
    

測試使用服務註冊中心

  • package cn.enjoyedu.rpc;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class RpcRegApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RpcRegApplication.class, args);
        }
    
    }
    
    

修改響應的服務提供者

  • 之前的服務都是註冊在本地,現在除了在本地存一份,現在改成註冊到註冊中心,在哪個ip地址上提供了哪些服務
RegisterServiceWithRegCenter
  • package cn.enjoyedu.rpc.rpc.base;
    
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 類說明:註冊服務,引入了服務的註冊和發現機制
     */
    @Service
    public class RegisterServiceWithRegCenter {
    
        /*本地可提供服務的一個名單,用快取實現*/
        private static final Map<String,Class> serviceCache
                = new ConcurrentHashMap<>();
    
        /*往遠端註冊伺服器註冊本服務,同時在本地註冊本服務*/
        public void regRemote(String serviceName, String host, int port, Class impl)
                throws Throwable{
            //登記到註冊中心
            Socket socket = null;
            ObjectOutputStream output = null;
            ObjectInputStream input = null;
    
            try{
                socket = new Socket();
                socket.connect(new InetSocketAddress("127.0.0.1",9999));
    
                output = new ObjectOutputStream(socket.getOutputStream());
                /*註冊服務*/
                output.writeBoolean(false);
                /*提供的服務名*/
                output.writeUTF(serviceName);
                /*服務提供方的IP*/
                output.writeUTF(host);
                /*服務提供方的埠*/
                output.writeInt(port);
                output.flush();
    
                input = new ObjectInputStream(socket.getInputStream());
                if(input.readBoolean()){
                    System.out.println("服務["+serviceName+"]註冊成功!");
                }
    
                /*可提供服務放入本地快取*/
                serviceCache.put(serviceName,impl);
    
            } catch (IOException e) {
                e.printStackTrace();
            }  finally{
                if (socket!=null) socket.close();
                if (output!=null) output.close();
                if (input!=null) input.close();
            }
        }
    
        /*獲取服務*/
        public Class getLocalService(String serviceName) {
            return serviceCache.get(serviceName);
        }
    
    }
    
    

修改rpcServerFrame

  • package cn.enjoyedu.rpc.rpc.base;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.Method;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     *@author Mark老師  
     *
     *類說明:rpc框架的服務端部分
     */
    @Service
    public class RpcServerFrame {
    
    //    @Autowired
    //    private RegisterService registerService;
        // 修改1
        @Autowired
        private RegisterServiceWithRegCenter registerServiceWithRegCenter;
    
        //服務的埠號
        private int port;
    
        /*處理服務請求任務*/
        private static class ServerTask implements Runnable{
    
            private Socket socket;
            // 修改3
            private RegisterServiceWithRegCenter registerServiceWithRegCenter;
    
            // 修改4
            public ServerTask(Socket client,
                              RegisterServiceWithRegCenter registerServiceWithRegCenter) {
                this.socket = client;
                this.registerServiceWithRegCenter = registerServiceWithRegCenter;
            }
    		
            // 接受客戶端的請求,並呼叫實際的方法
            // 接受的內容---1.方法所在的類名介面名 2.呼叫的方法名  3.方法引數及方法屬性、具體的引數值
            // 
            @Override
            public void run() {
                try(
                     ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                      ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){
    
                    /*方法所在類名介面名*/
                    // 讀字串inputStream.readUTF()
                    String serviceName = inputStream.readUTF();
                    /*方法的名字*/
                    String methodName = inputStream.readUTF();
                    /*方法的入參型別*/
                    // 讀物件inputStream.readObject()
                    // 強制轉型,轉成類
                    Class<?>[] paramTypes = (Class<?>[]) inputStream.readObject();
                    /*方法的入參的值*/
                    Object[] args = (Object[]) inputStream.readObject();
    
                    // 修改5
                    /*從容器中拿到服務的Class物件*/
                    Class serviceClass = registerServiceWithRegCenter.getLocalService(serviceName);
                    if(serviceClass == null){
                        throw new ClassNotFoundException(serviceName+ " not found");
                    }
    
                    /*通過反射,執行實際的服務*/
                    // 通過反射獲取具體的方法Method物件
                    // 入參包括兩個,方法名和引數型別陣列
                    Method method = serviceClass.getMethod(methodName, paramTypes);
                    // 通過動態代理呼叫響應的方法
                    Object result  = method.invoke(serviceClass.newInstance(),args);
    
                    /*將服務的執行結果通知呼叫者*/
                    // 將呼叫結果輸出給呼叫者,即客戶端
                    outputStream.writeObject(result);
                    outputStream.flush();
    
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        public void startService(String serviceName, String host, int port, Class impl) throws Throwable{
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(port));
            System.out.println("RPC server on:"+port+":執行");
            // 修改2
           // registerService.regRemote(ServiceName, impl);
    	
                  registerServiceWithRegCenter.regRemote(serviceName,host,port,impl);
            try{
                while(true){
                    new Thread(new ServerTask(serverSocket.accept(),
                            registerServiceWithRegCenter)).start();
                }
            }finally {
                serverSocket.close();
            }
        }
    
    }
    
    
    

測試註冊中心

  • 為了多註冊服務,埠號取隨機的

  • 啟動sms

  • 再啟動一次sms

  • 客戶端啟動

    • 動態代理的地址就不能寫死了,應該從註冊中心獲取地址

    • package cn.enjoyedu.rpc.client.rpc;
      
      import cn.enjoyedu.rpc.remote.vo.RegisterServiceVo;
      import org.springframework.stereotype.Service;
      
      import java.io.ObjectInputStream;
      import java.io.ObjectOutputStream;
      import java.lang.reflect.InvocationHandler;
      import java.lang.reflect.Method;
      import java.lang.reflect.Proxy;
      import java.net.InetSocketAddress;
      import java.net.Socket;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Random;
      import java.util.Set;
      
      /**
       *@author Mark老師   
       *類說明:rpc框架的客戶端代理部分
       */
      @Service
      public class RpcClientFrame {
      
          /*遠端服務的代理物件,引數為客戶端要呼叫的的服務*/
          // 傳遞的應該是方法所在的類名或介面名
          public static<T> T getRemoteProxyObject(final Class<?> serviceInterface) throws Exception {
              /*獲得遠端服務的一個網路地址*/
              // 修改1
              // InetSocketAddress addr = new InetSocketAddress("127.0.0.1",8778);
              InetSocketAddress addr =   getService(serviceInterface.getName());
      
              /*拿到一個代理物件,由這個代理物件通過網路進行實際的服務呼叫*/
              // 應該產生一個代理物件,和伺服器進行通訊
              // 需要實現了InvocationHandler的類DynProxy
              return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(),
                      new Class<?>[]{serviceInterface},
                      new DynProxy(serviceInterface,addr));
          }
      
      
          /*動態代理,實現對遠端服務的訪問*/
          // 連線伺服器,傳送介面名、方法名、方法引數
          private static class DynProxy implements InvocationHandler{
              private Class<?> serviceInterface;
              private InetSocketAddress addr;
      
              public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) {
                  this.serviceInterface = serviceInterface;
                  this.addr = addr;
              }
      
              @Override
              public Object invoke(Object proxy, Method method, Object[] args)
                      throws Throwable {
                  Socket socket = null;
                  ObjectInputStream inputStream = null;
                  ObjectOutputStream outputStream = null;
                  try{
                      
                      // 1.建立網路連線
                      socket = new Socket();
                      socket.connect(addr);
                      outputStream = new ObjectOutputStream(socket.getOutputStream());
      
                      // 2.傳送介面名、方法名、引數型別、引數值
                      //方法所在類名介面名
                      outputStream.writeUTF(serviceInterface.getName());
                      //方法的名字
                      outputStream.writeUTF(method.getName());
                      //方法的入參型別
                      outputStream.writeObject(method.getParameterTypes());
                      //方法入參的值
                      outputStream.writeObject(args);
      
                      outputStream.flush();
      
                      // 3.接受伺服器傳回的結果
                      inputStream = new ObjectInputStream(socket.getInputStream());
                      /*接受伺服器的輸出*/
                      System.out.println(serviceInterface+" remote exec success!");
                      return inputStream.readObject();
      
                  }finally {
                      if(socket!=null) socket.close();
                      if(outputStream!=null) outputStream.close();
                      if(inputStream!=null) inputStream.close();
      
                  }
              }
          }
      
      
      
          /*----------------以下和動態獲得服務提供者有關------------------------------*/
      
          private static Random r = new Random();
      
          /*獲得遠端服務的地址*/
          private static InetSocketAddress getService(String serviceName)
                  throws Exception {
              //獲得服務提供者的地址列表
              List<InetSocketAddress> serviceVoList = getServiceList(serviceName);
              InetSocketAddress addr
                      = serviceVoList.get(r.nextInt(serviceVoList.size()));
              System.out.println("本次選擇了伺服器:"+addr);
              return addr;
          }
      
          /*獲得服務提供者的地址*/
          private static List<InetSocketAddress> getServiceList(String serviceName)
                  throws Exception {
              Socket socket = null;
              ObjectOutputStream output = null;
              ObjectInputStream input = null;
      
              try{
                  socket = new Socket();
                  socket.connect(new InetSocketAddress("127.0.0.1",9999));
      
                  output = new ObjectOutputStream(socket.getOutputStream());
                  //需要獲得服務提供者
                  output.writeBoolean(true);
                  //告訴註冊中心服務名
                  output.writeUTF(serviceName);
                  output.flush();
      
                  input = new ObjectInputStream(socket.getInputStream());
                  Set<RegisterServiceVo> result
                          = (Set<RegisterServiceVo>)input.readObject();
                  List<InetSocketAddress> services = new ArrayList<>();
                  for(RegisterServiceVo serviceVo : result){
                      String host = serviceVo.getHost();//獲得服務提供者的IP
                      int port = serviceVo.getPort();//獲得服務提供者的埠號
                      InetSocketAddress serviceAddr = new InetSocketAddress(host,port);
                      services.add(serviceAddr);
                  }
                  System.out.println("獲得服務["+serviceName
                          +"]提供者的地址列表["+services+"],準備呼叫.");
                  return services;
              }finally{
                  if (socket!=null) socket.close();
                  if (output!=null) output.close();
                  if (input!=null) input.close();
              }
      
          }
      
      }
      
      
    • 取服務時,如果有多個,做負載均衡?

      用隨機數

高併發RPC解決方案

  • 基於TCP的RPC實現

    • Dubbo:
      阿里巴巴公司開源的一個高效能優秀的服務框架,使得應用可通過高效能的RPC實現服務的輸出和輸入功能,可以和Spring框架無縫整合。
  • Provider: 暴露服務的服務提供方。

  • Consumer: 呼叫遠端服務的服務消費方。

  • Registry: 服務註冊與發現的註冊中心。

  • Monitor: 統計服務的呼叫次調和呼叫時間的監控中心。

  • Container: 服務執行容器。