網路程式設計3
網路通訊基本常識
-
程式設計中的Socket是什麼?
-
Socket是應用層與TCP/IP協議族通訊的間軟體抽象層,它是一組介面,其實就是一個門面模式。TCP用主機的IP地址加上主機上的埠號作為TCP連線的端點,這種端點就叫做套接字(socket)。
-
socket是一種門面模式,遮蔽了與機器的交流過程,方便業務程式設計師開發程式碼
-
A的socket ---- B的socket
兩個socket建立關係,從而A和B就可以通訊了
-
-
短連線
-
一次通訊完了馬上就關閉連線
-
伺服器要服務成千上億的服務
-
短連線和長連線到底如何選擇,需要根據實際情況
-
-
長連線
-
一次連線內會進行多次通訊
-
維持住長連線主要消耗的是記憶體和FD檔案描述符
-
如果伺服器效能好,可以維持上百萬的長連線
-
操作頻繁、或者點對點的通訊上,連線池來維護這種長連線
-
-
InetAddress類 --> 只表示地址(主機)
-
InetAddress address = InetAddress.getByName(“www.baidu.com”)
-
可以獲得百度的ip地址
-
怎麼獲得的?-- dns解析
-
InetAddress address2 = InetAddress.getByName(“124.232.170.22”)
System.out.println(address2.getHostName())
同樣可以根據ip地址獲取域名,如果找不到,就會列印源ip地址
-
InetAddress[] allIp= InetAddress.getAllByName(“www.baidu.com”)
獲取百度域名下面所有的ip地址
-
把一個ip地址包裝成InetAddress物件
byte[] bytes = {(byte)192,(byte)168,56,1};
InetAddress address = InetAddress.getByAddress(bytes);
InetSocketAddress類 --> 主機名+埠
NetoworkInterface類
-
開啟裝置管理器-網路介面卡
NetoworkInterface會把這些全部找出來,除了這些,還有別的
// 127.0.0.1-------本地迴環介面
InetAddress address = InetAddress.getByName(“127.0.0.1”);
NetworkInterface byInetAddress = NetoworkInterface.getByInetAddress(address);
// 獲取所有的網路通訊介面類
// 包括本地迴環、網路介面卡、wifi、廣域網、硬體裝置通訊
NetoworkInterface.getNetoworkInterfaces();
-
-
服務端、客戶端、通訊程式設計關注的三件事
-
所有網路通訊一定有服務端和客戶端這兩樣
-
提供服務的稱為服務端
-
連線服務的稱為客戶端
-
某個類有Server、ServerSocket,那麼這個類往往是給服務端使用的
ServerSocket只是個容器,容納網路服務用的
-
某個類只有socket,一般是負責具體的網路讀寫
真實網路通訊中,真正進行網路讀寫的都是socket
-
-
網路程式設計中一定要關注的點
- 網路連線
- 讀網路資料
- 寫網路資料
-
jdk網路通訊
-
bio
-
nio
-
aio—非主流
-
原生JDK網路程式設計-BIO
-
BIO:block ,即阻塞式io
-
ServerSocket負責繫結IP地址,啟動監聽埠;
-
Socket負責發起連線操作。連線成功後,雙方通過輸入和輸出流進行同步阻塞式通訊
-
ServerSocket啟動了處於監聽狀態 通過accept()方法來響應請求的socket() 新生成一個執行緒,線上程中生成一個socket()來與請求的socket()通訊
伺服器端
-
package WangLuoBianCheng.wangLuoBianCheng3; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; public class Server{ public static void main(String[] args)throws IOException { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(10001)); System.out.println("Server is started..."); while(true){ // 來一個socket請求,就會new一個執行緒,同時new 一個socket // 執行server.accept()成功,如果三次握手完成會返回一個Socket // 更優的寫法是用執行緒池,實現複用 new Thread(new ServerTask(server.accept())).start(); } } private static class ServerTask implements Runnable{ private Socket socket = null; public ServerTask(Socket socket) { this.socket = socket; } @Override public void run(){ try (ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())){ String userName = input.readUTF(); System.out.println("Accept client Message:"+userName); output.writeUTF("Hello,"+userName); // 上一句只是寫入快取,準備傳送 // 強制刷出 output.flush(); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
客戶端
-
package WangLuoBianCheng.wangLuoBianCheng3; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.Socket; public class Client { public static void main(String[] args) throws IOException { // 用來通訊的socket Socket socket = null; // 注意客戶端必須是輸出在上,輸入在下,確保流通道建立起來 // 因為如果兩端都是輸入,怎麼傳遞? // 或者兩端都是先建立輸出流,再建立輸入流,未測試? ObjectOutputStream objectOutputStream = null; InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 10001); ObjectInputStream objectInputStream = null; try { socket = new Socket(); // 建立連線 socket.connect(inetSocketAddress); objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); objectInputStream = new ObjectInputStream(socket.getInputStream()); // 發出訊息 objectOutputStream.writeUTF("James"); objectOutputStream.flush(); // 接受伺服器響應的資訊並列印 // 執行完後關閉了網路連線 System.out.println(objectInputStream.readUTF()); } catch (IOException e) { e.printStackTrace(); } finally { if (socket!=null) socket.close(); if (objectOutputStream!=null) objectOutputStream.close(); if (objectInputStream!=null) objectInputStream.close(); } } }
改進—偽非同步IO模型
-
上述伺服器端是來一個socket請求,就會new一個執行緒,同時new 一個socket
-
為了實現執行緒複用,應該採用執行緒池的執行緒來處理任務,而這種模式又稱為偽非同步IO模型
-
package WangLuoBianCheng.wangLuoBianCheng3; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ServerPool { private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2); public static void main(String[] args)throws IOException { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(10001)); System.out.println("Server is started..."); while(true){ // 執行server.accept()成功,如果三次握手完成會返回一個Socket executorService.execute(new ServerTask(server.accept())); } } private static class ServerTask implements Runnable{ private Socket socket = null; public ServerTask(Socket socket) { this.socket = socket; } @Override public void run(){ try (ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream())){ String userName = input.readUTF(); System.out.println("Accept client Message:"+userName); output.writeUTF("Hello,"+userName); // 強制刷出 output.flush(); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
BIO應用-RPC框架
-
單WEB專案
- 1.訂單服務
- 2.扣減庫存服務
- 3.傳送簡訊提示服務
-
之前上述三個服務全是單執行緒執行,當呼叫服務的使用者數很大時,就不能充分發揮併發程式設計的高效,因此考慮將三個服務拆分,都分別由單獨的執行緒來執行
-
進一步把不同的服務部署到不同的伺服器上,而且每個服務是通過伺服器叢集的形式來部署,從而形成了分散式的架構
- 這時不同服務之間的呼叫就需要引入rpc,因為是跨伺服器跨網路的呼叫,不再是單機內部的方法呼叫
什麼是RPC?
- RPC(Remote Procedure Call ——遠端過程呼叫),它是一種通過網路從遠端計算機程式上請求服務,而不需要了解底層網路的技術。–來自《百度百科》
客戶端存根–server stub
-
扣減庫存服務,為了使呼叫遠端服務和呼叫本地服務沒有什麼區別,就需要引入客戶端存根
-
客戶端存根就是遠端方法在本地的模擬,包括方法名、方法引數
服務端存根–client stub
- 負責把客戶端請求發來的請求體,解析成跟伺服器實際方法相同的方法名、方法引數
總結
- rpc方法就是把上述過程全部包裝起來,使得呼叫遠端方法就跟呼叫本地方法一樣
RPC和HTTP
-
TCP/IP概念層模型 功能 TCP/IP協議族 檔案傳輸、電子郵件、檔案服務、虛擬終端 TFTP、HTTP、SNMP、FTP、SMTP、DNS、Telnet 應用層 資料格式化、程式碼轉換、資料加密 無 解除或建立與別的接點的聯絡 無 傳輸層 提供端與端的介面 TCP、UDP 網路層 為資料包選擇路由 IP、ICMP、RIP、OSPF、BGP、IGMP 鏈路層 傳輸有地址的幀以及錯誤檢測功能 SLIP、CSLIP、PPP、ARP、RARP、MTU 以二進位制資料形式在物理媒體上傳輸資料 ISO2110、IEEE802、IEEE802.2
rpc究竟在上面協議分層的哪一層?
- rpc只是一種思想,對不同服務呼叫的一種描述,既可以通過http實現,也可以通過tcp、htp,所以rpc和http不是一個層級的東西
實現RPC框架需要解決的那些問題?
-
通訊問題
- 不要每次方法呼叫,都需要建立socket連線
-
代理問題
-
可以不可以通過代理的方式,每次的通訊都通過指定的代理來解決
-
代理問題的解決
代理模式,用動態代理,為什麼不用靜態代理?-- 針對每一個服務都要建立相應的代理類,而rpc框架不知道要建立哪些靜態代理,所以使用動態代理,不管你要呼叫什麼服務,通過動態代理一把解決
-
-
序列化問題
-
訊息在網路中傳輸是位元組的形式,怎麼把01位元組變成javaBean的形式
-
序列化問題的解決
實現Serializable介面,但是jdk的序列化效能很差
測試結果1-----jdk的位元組碼長度:133 自己位元組碼長度:24
測試結果2-----jdk序列化耗時:1364ms 自己序列化耗時:108ms
-
-
服務例項化
-
方法呼叫時只有一個方法名和方法引數,怎麼轉化成呼叫具體的服務
-
服務例項化的解決
通過反射解決
-
序列化耗時測試
測試1
-
package WangLuoBianCheng.wangLuoBianCheng3.rpc.prepare.serial.protogenesis; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; /** * @author Mark老師 享學課堂 https://enjoy.ke.qq.com * 類說明:測試序列化後位元組大小 */ public class TestUserInfo { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { UserInfo info = new UserInfo(); info.buildUserID(100).buildUserName("Welcome to Netty"); //使用jdk的序列化 ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream(bos); os.writeObject(info); os.flush(); os.close(); byte[] b = bos.toByteArray(); System.out.println("The jdk serializable length is : " + b.length); bos.close(); //使用自行的序列化 System.out.println("-------------------------------------"); System.out.println("The byte array serializable length is : " + info.codeC().length); } }
測試2
-
package WangLuoBianCheng.wangLuoBianCheng3.rpc.prepare.serial.protogenesis; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; /** * @author Mark老師 享學課堂 https://enjoy.ke.qq.com * 類說明:測試序列化效能差異 */ public class PerformTestUserInfo { public static void main(String[] args) throws IOException { UserInfo info = new UserInfo(); info.buildUserID(100).buildUserName("Welcome to Netty"); int loop = 1000000; //使用jdk的序列化 ByteArrayOutputStream bos = null; ObjectOutputStream os = null; long startTime = System.currentTimeMillis(); for (int i = 0; i < loop; i++) { bos = new ByteArrayOutputStream(); os = new ObjectOutputStream(bos); os.writeObject(info); os.flush(); os.close(); byte[] b = bos.toByteArray(); bos.close(); } long endTime = System.currentTimeMillis(); System.out.println("The jdk serializable cost time is : " + (endTime - startTime) + " ms"); //使用自行的序列化 System.out.println("-------------------------------------"); ByteBuffer buffer = ByteBuffer.allocate(1024); startTime = System.currentTimeMillis(); for (int i = 0; i < loop; i++) { byte[] b = info.codeC(buffer); } endTime = System.currentTimeMillis(); System.out.println("The byte array serializable cost time is : " + (endTime - startTime) + " ms"); } }
實現rpc框架
- 1.服務端定義介面和服務實現類並且註冊服務
- 2.客戶端使用動態代理呼叫服務(動態代理)
- 3.客戶端代理把呼叫物件、方法、引數序列化成資料
- 4.客戶端代理與服務端通過Socket通訊傳輸資料
- 5.服務端反序列化資料成物件、方法、引數。
- 6.服務端代理拿到這些物件和引數後通過反射的機制呼叫服務的例項。
將簡訊服務拆分成rpc服務—服務端
- 服務實體類,服務需要的相關引數型別 -----> 這些可以看出服務端存根
所有服務使用的服務框架----RpcServerFrame
-
package cn.enjoyedu.rpc.rpc.base; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; /** *@author Mark老師 * *類說明:rpc框架的服務端部分 */ @Service public class RpcServerFrame { // @Autowired // private RegisterService registerService; @Autowired private RegisterServiceWithRegCenter registerServiceWithRegCenter; //服務的埠號 private int port; /*處理服務請求任務*/ private static class ServerTask implements Runnable{ private Socket socket; private RegisterServiceWithRegCenter registerServiceWithRegCenter; public ServerTask(Socket client, RegisterServiceWithRegCenter registerServiceWithRegCenter) { this.socket = client; this.registerServiceWithRegCenter = registerServiceWithRegCenter; } // 接受客戶端的請求,並呼叫實際的方法 // 接受的內容---1.方法所在的類名介面名 2.呼叫的方法名 3.方法引數及方法屬性、具體的引數值 // @Override public void run() { try( ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){ /*方法所在類名介面名*/ // 讀字串inputStream.readUTF() String serviceName = inputStream.readUTF(); /*方法的名字*/ String methodName = inputStream.readUTF(); /*方法的入參型別*/ // 讀物件inputStream.readObject() // 強制轉型,轉成類 Class<?>[] paramTypes = (Class<?>[]) inputStream.readObject(); /*方法的入參的值*/ Object[] args = (Object[]) inputStream.readObject(); /*從容器中拿到服務的Class物件*/ Class serviceClass = registerServiceWithRegCenter.getLocalService(serviceName); if(serviceClass == null){ throw new ClassNotFoundException(serviceName+ " not found"); } /*通過反射,執行實際的服務*/ // 通過反射獲取具體的方法Method物件 // 入參包括兩個,方法名和引數型別陣列 Method method = serviceClass.getMethod(methodName, paramTypes); // 通過動態代理呼叫響應的方法 Object result = method.invoke(serviceClass.newInstance(),args); /*將服務的執行結果通知呼叫者*/ // 將呼叫結果輸出給呼叫者,即客戶端 outputStream.writeObject(result); outputStream.flush(); }catch (Exception e){ e.printStackTrace(); }finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } public void startService(String serviceName, String host, int port, Class impl) throws Throwable{ ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); System.out.println("RPC server on:"+port+":執行"); // registerService.regRemote(ServiceName, impl); registerServiceWithRegCenter.regRemote(serviceName,host,port,impl); try{ while(true){ new Thread(new ServerTask(serverSocket.accept(), registerServiceWithRegCenter)).start(); } }finally { serverSocket.close(); } } }
服務註冊中心
-
package cn.enjoyedu.rpc.rpc.base; import org.springframework.stereotype.Service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 類說明:註冊服務到本地快取 */ @Service public class RegisterService { /*本地可以提供服務的一個容器*/ private static final Map<String,Class> serviceCache = new ConcurrentHashMap<>(); /*註冊本服務*/ public void regService(String serviceName,Class impl){ serviceCache.put(serviceName,impl); } /*獲取服務*/ public Class getLocalService(String serviceName){ return serviceCache.get(serviceName); } }
簡訊服務啟動
SmsRpcServer
-
package cn.enjoyedu.rpc.rpc.sms; import cn.enjoyedu.rpc.rpc.base.RpcServerFrame; import cn.enjoyedu.rpc.remote.SendSms; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Random; /** * 類說明:rpc的服務端,提供服務 */ @Service public class SmsRpcServer { // rpc的base框架部分注入 @Autowired private RpcServerFrame rpcServerFrame; @PostConstruct public void server() throws Throwable { Random r = new Random(); // 埠號 int port = 8778+r.nextInt(100); // 啟動服務 rpcServerFrame.startService(SendSms.class.getName(), "127.0.0.1",port,SendSmsImpl.class); } }
SendSmsImpl—具體的服務實現類
-
package cn.enjoyedu.rpc.rpc.sms; import cn.enjoyedu.rpc.remote.vo.UserInfo; import cn.enjoyedu.rpc.remote.SendSms; /** *@author Mark老師 享學課堂 https://enjoy.ke.qq.com * *類說明:簡訊息傳送服務的實現 */ public class SendSmsImpl implements SendSms { @Override public boolean sendMail(UserInfo user) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("已傳送簡訊息給:"+user.getName()+"到【"+user.getPhone()+"】"); return true; } }
將簡訊服務拆分成rpc服務—客戶端
- 類似於伺服器端存根,也需要有客戶端存根
客戶端框架類
-
package cn.enjoyedu.rpc.client.rpc; import cn.enjoyedu.rpc.remote.vo.RegisterServiceVo; import org.springframework.stereotype.Service; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; /** *@author Mark老師 *類說明:rpc框架的客戶端代理部分 */ @Service public class RpcClientFrame { /*遠端服務的代理物件,引數為客戶端要呼叫的的服務*/ // 傳遞的應該是方法所在的類名或介面名 public static<T> T getRemoteProxyObject(final Class<?> serviceInterface) throws Exception { /*獲得遠端服務的一個網路地址*/ InetSocketAddress addr = //new InetSocketAddress("127.0.0.1",8778); getService(serviceInterface.getName()); /*拿到一個代理物件,由這個代理物件通過網路進行實際的服務呼叫*/ // 應該產生一個代理物件,和伺服器進行通訊 // 需要實現了InvocationHandler的類DynProxy return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new DynProxy(serviceInterface,addr)); } /*動態代理,實現對遠端服務的訪問*/ // 連線伺服器,傳送介面名、方法名、方法引數 private static class DynProxy implements InvocationHandler{ private Class<?> serviceInterface; private InetSocketAddress addr; public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) { this.serviceInterface = serviceInterface; this.addr = addr; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; try{ // 1.建立網路連線 socket = new Socket(); socket.connect(addr); outputStream = new ObjectOutputStream(socket.getOutputStream()); // 2.傳送介面名、方法名、引數型別、引數值 //方法所在類名介面名 outputStream.writeUTF(serviceInterface.getName()); //方法的名字 outputStream.writeUTF(method.getName()); //方法的入參型別 outputStream.writeObject(method.getParameterTypes()); //方法入參的值 outputStream.writeObject(args); outputStream.flush(); // 3.接受伺服器傳回的結果 inputStream = new ObjectInputStream(socket.getInputStream()); /*接受伺服器的輸出*/ System.out.println(serviceInterface+" remote exec success!"); return inputStream.readObject(); }finally { if(socket!=null) socket.close(); if(outputStream!=null) outputStream.close(); if(inputStream!=null) inputStream.close(); } } } /*----------------以下和動態獲得服務提供者有關------------------------------*/ private static Random r = new Random(); /*獲得遠端服務的地址*/ private static InetSocketAddress getService(String serviceName) throws Exception { //獲得服務提供者的地址列表 List<InetSocketAddress> serviceVoList = getServiceList(serviceName); InetSocketAddress addr = serviceVoList.get(r.nextInt(serviceVoList.size())); System.out.println("本次選擇了伺服器:"+addr); return addr; } /*獲得服務提供者的地址*/ private static List<InetSocketAddress> getServiceList(String serviceName) throws Exception { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try{ socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1",9999)); output = new ObjectOutputStream(socket.getOutputStream()); //需要獲得服務提供者 output.writeBoolean(true); //告訴註冊中心服務名 output.writeUTF(serviceName); output.flush(); input = new ObjectInputStream(socket.getInputStream()); Set<RegisterServiceVo> result = (Set<RegisterServiceVo>)input.readObject(); List<InetSocketAddress> services = new ArrayList<>(); for(RegisterServiceVo serviceVo : result){ String host = serviceVo.getHost();//獲得服務提供者的IP int port = serviceVo.getPort();//獲得服務提供者的埠號 InetSocketAddress serviceAddr = new InetSocketAddress(host,port); services.add(serviceAddr); } System.out.println("獲得服務["+serviceName +"]提供者的地址列表["+services+"],準備呼叫."); return services; }finally{ if (socket!=null) socket.close(); if (output!=null) output.close(); if (input!=null) input.close(); } } }
遠端的服務怎麼通過bean接管
-
package cn.enjoyedu.rpc.client.config; import cn.enjoyedu.rpc.client.rpc.RpcClientFrame; import cn.enjoyedu.rpc.remote.SendSms; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 類說明: */ @Configuration public class BeanConfig { @Autowired private RpcClientFrame rpcClientFrame; // 獲取服務端的方法名 @Bean public SendSms getSmsService() throws Exception{ return rpcClientFrame.getRemoteProxyObject(SendSms.class); } }
rpc測試
啟動服務端
-
package cn.enjoyedu.rpc; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RpcServerSmsApplication { public static void main(String[] args) { SpringApplication.run(RpcServerSmsApplication.class, args); } }
客戶端測試方法
-
package cn.enjoyedu.rpc.client; import cn.enjoyedu.rpc.client.service.NormalBusi; import cn.enjoyedu.rpc.remote.SendSms; import cn.enjoyedu.rpc.remote.StockService; import cn.enjoyedu.rpc.remote.vo.UserInfo; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class RpcClientApplicationTests { @Autowired private NormalBusi normalBusi; // @Autowired // private StockService stockService; @Autowired private SendSms sendSms; @Test void rpcTest() { long start = System.currentTimeMillis(); normalBusi.business(); /*傳送郵件*/ UserInfo userInfo = new UserInfo("Mark","[email protected]"); System.out.println("Send mail: "+ sendSms.sendMail(userInfo)); System.out.println("共耗時:"+(System.currentTimeMillis()-start)+"ms"); /*扣減庫存*/ // stockService.addStock("A001",1000); // stockService.deduceStock("B002",50); } }
測試總結
- 現在在客戶端只有SendSms這樣一個介面,並沒有實現類
- 把代理類作為實現類注入到spring的容器裡面
- 代理類裡面,把對方法的呼叫通過網路轉給了遠端簡訊傳送伺服器上
- 而遠端簡訊傳送伺服器上,通過一個map容器儲存了改伺服器上能提供的服務
- 在伺服器端啟動rpc服務時,把具體的實現放到hashmap中,並且啟動了一個網路相關的服務,專門接受客戶端傳遞過來的方法呼叫請求
- 從而實現了一次rpc的遠端方法呼叫
註冊中心
-
dubbo等rpc框架存在一個註冊中心,而現在的rpc呼叫只有一個,如果要實現叢集化,如果要提供多個rpc服務,難道客戶端呼叫的時候要寫多個伺服器地址進去?
-
註冊中心本質也是提供rpc服務,只有兩種,提供服務的註冊,提供服務的查詢
RegisterCenter
-
package cn.enjoyedu.rpc.rpc.reg.service; import cn.enjoyedu.rpc.remote.vo.RegisterServiceVo; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * @author Mark老師 * 類說明:服務註冊中心,服務提供者在啟動時需要在註冊中心登記自己的資訊 */ @Service public class RegisterCenter { /*key表示服務名,value代表服務提供者地址的集合*/ private static final Map<String,Set<RegisterServiceVo>> serviceHolder = new HashMap<>(); /*註冊服務的埠號*/ private int port; /*服務註冊,考慮到可能有多個提供者同時註冊,進行加鎖*/ private static synchronized void registerService(String serviceName, String host,int port){ //獲得當前服務的已有地址集合 Set<RegisterServiceVo> serviceVoSet = serviceHolder.get(serviceName); if(serviceVoSet==null){ //已有地址集合為空,新增集合 serviceVoSet = new HashSet<>(); serviceHolder.put(serviceName,serviceVoSet); } //將新的服務提供者加入集合 serviceVoSet.add(new RegisterServiceVo(host,port)); System.out.println("服務已註冊["+serviceName+"]," + "地址["+host+"],埠["+port+"]"); } /*取出服務提供者*/ private static Set<RegisterServiceVo> getService(String serviceName){ return serviceHolder.get(serviceName); } /*處理服務請求的任務,其實無非就是兩種服務: 1、服務註冊服務 2、服務查詢服務 */ private static class ServerTask implements Runnable{ private Socket client = null; public ServerTask(Socket client){ this.client = client; } public void run() { try(ObjectInputStream inputStream = new ObjectInputStream(client.getInputStream()); ObjectOutputStream outputStream = new ObjectOutputStream(client.getOutputStream())){ /*檢查當前請求是註冊服務還是獲得服務*/ boolean isGetService = inputStream.readBoolean(); /*服務查詢服務,獲得服務提供者*/ if(isGetService){ String serviceName = inputStream.readUTF(); /*取出服務提供者集合*/ Set<RegisterServiceVo> result = getService(serviceName); /*返回給客戶端*/ outputStream.writeObject(result); outputStream.flush(); System.out.println("將已註冊的服務["+serviceName+"提供給客戶端"); } /*服務註冊服務*/ else{ /*取得新服務提供方的ip和埠*/ String serviceName = inputStream.readUTF(); String host = inputStream.readUTF(); int port = inputStream.readInt(); /*在註冊中心儲存*/ registerService(serviceName,host,port); outputStream.writeBoolean(true); outputStream.flush(); } }catch(Exception e){ e.printStackTrace(); }finally { try { client.close(); } catch (IOException e) { e.printStackTrace(); } } } } /*啟動註冊服務*/ public void startService() throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); System.out.println("服務註冊中心 on:"+port+":執行"); try{ while(true){ new Thread(new ServerTask(serverSocket.accept())).start(); } }finally { serverSocket.close(); } } // 服務註冊中心埠是9999 @PostConstruct public void init() { this.port = 9999; new Thread(new Runnable() { public void run() { try{ startService(); }catch(IOException e){ e.printStackTrace(); } } }).start(); } }
測試使用服務註冊中心
-
package cn.enjoyedu.rpc; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RpcRegApplication { public static void main(String[] args) { SpringApplication.run(RpcRegApplication.class, args); } }
修改響應的服務提供者
- 之前的服務都是註冊在本地,現在除了在本地存一份,現在改成註冊到註冊中心,在哪個ip地址上提供了哪些服務
RegisterServiceWithRegCenter
-
package cn.enjoyedu.rpc.rpc.base; import org.springframework.stereotype.Service; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 類說明:註冊服務,引入了服務的註冊和發現機制 */ @Service public class RegisterServiceWithRegCenter { /*本地可提供服務的一個名單,用快取實現*/ private static final Map<String,Class> serviceCache = new ConcurrentHashMap<>(); /*往遠端註冊伺服器註冊本服務,同時在本地註冊本服務*/ public void regRemote(String serviceName, String host, int port, Class impl) throws Throwable{ //登記到註冊中心 Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try{ socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1",9999)); output = new ObjectOutputStream(socket.getOutputStream()); /*註冊服務*/ output.writeBoolean(false); /*提供的服務名*/ output.writeUTF(serviceName); /*服務提供方的IP*/ output.writeUTF(host); /*服務提供方的埠*/ output.writeInt(port); output.flush(); input = new ObjectInputStream(socket.getInputStream()); if(input.readBoolean()){ System.out.println("服務["+serviceName+"]註冊成功!"); } /*可提供服務放入本地快取*/ serviceCache.put(serviceName,impl); } catch (IOException e) { e.printStackTrace(); } finally{ if (socket!=null) socket.close(); if (output!=null) output.close(); if (input!=null) input.close(); } } /*獲取服務*/ public Class getLocalService(String serviceName) { return serviceCache.get(serviceName); } }
修改rpcServerFrame
-
package cn.enjoyedu.rpc.rpc.base; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; /** *@author Mark老師 * *類說明:rpc框架的服務端部分 */ @Service public class RpcServerFrame { // @Autowired // private RegisterService registerService; // 修改1 @Autowired private RegisterServiceWithRegCenter registerServiceWithRegCenter; //服務的埠號 private int port; /*處理服務請求任務*/ private static class ServerTask implements Runnable{ private Socket socket; // 修改3 private RegisterServiceWithRegCenter registerServiceWithRegCenter; // 修改4 public ServerTask(Socket client, RegisterServiceWithRegCenter registerServiceWithRegCenter) { this.socket = client; this.registerServiceWithRegCenter = registerServiceWithRegCenter; } // 接受客戶端的請求,並呼叫實際的方法 // 接受的內容---1.方法所在的類名介面名 2.呼叫的方法名 3.方法引數及方法屬性、具體的引數值 // @Override public void run() { try( ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream())){ /*方法所在類名介面名*/ // 讀字串inputStream.readUTF() String serviceName = inputStream.readUTF(); /*方法的名字*/ String methodName = inputStream.readUTF(); /*方法的入參型別*/ // 讀物件inputStream.readObject() // 強制轉型,轉成類 Class<?>[] paramTypes = (Class<?>[]) inputStream.readObject(); /*方法的入參的值*/ Object[] args = (Object[]) inputStream.readObject(); // 修改5 /*從容器中拿到服務的Class物件*/ Class serviceClass = registerServiceWithRegCenter.getLocalService(serviceName); if(serviceClass == null){ throw new ClassNotFoundException(serviceName+ " not found"); } /*通過反射,執行實際的服務*/ // 通過反射獲取具體的方法Method物件 // 入參包括兩個,方法名和引數型別陣列 Method method = serviceClass.getMethod(methodName, paramTypes); // 通過動態代理呼叫響應的方法 Object result = method.invoke(serviceClass.newInstance(),args); /*將服務的執行結果通知呼叫者*/ // 將呼叫結果輸出給呼叫者,即客戶端 outputStream.writeObject(result); outputStream.flush(); }catch (Exception e){ e.printStackTrace(); }finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } public void startService(String serviceName, String host, int port, Class impl) throws Throwable{ ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(port)); System.out.println("RPC server on:"+port+":執行"); // 修改2 // registerService.regRemote(ServiceName, impl); registerServiceWithRegCenter.regRemote(serviceName,host,port,impl); try{ while(true){ new Thread(new ServerTask(serverSocket.accept(), registerServiceWithRegCenter)).start(); } }finally { serverSocket.close(); } } }
測試註冊中心
-
為了多註冊服務,埠號取隨機的
-
啟動sms
-
再啟動一次sms
-
客戶端啟動
-
動態代理的地址就不能寫死了,應該從註冊中心獲取地址
-
package cn.enjoyedu.rpc.client.rpc; import cn.enjoyedu.rpc.remote.vo.RegisterServiceVo; import org.springframework.stereotype.Service; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.Set; /** *@author Mark老師 *類說明:rpc框架的客戶端代理部分 */ @Service public class RpcClientFrame { /*遠端服務的代理物件,引數為客戶端要呼叫的的服務*/ // 傳遞的應該是方法所在的類名或介面名 public static<T> T getRemoteProxyObject(final Class<?> serviceInterface) throws Exception { /*獲得遠端服務的一個網路地址*/ // 修改1 // InetSocketAddress addr = new InetSocketAddress("127.0.0.1",8778); InetSocketAddress addr = getService(serviceInterface.getName()); /*拿到一個代理物件,由這個代理物件通過網路進行實際的服務呼叫*/ // 應該產生一個代理物件,和伺服器進行通訊 // 需要實現了InvocationHandler的類DynProxy return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new DynProxy(serviceInterface,addr)); } /*動態代理,實現對遠端服務的訪問*/ // 連線伺服器,傳送介面名、方法名、方法引數 private static class DynProxy implements InvocationHandler{ private Class<?> serviceInterface; private InetSocketAddress addr; public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) { this.serviceInterface = serviceInterface; this.addr = addr; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; try{ // 1.建立網路連線 socket = new Socket(); socket.connect(addr); outputStream = new ObjectOutputStream(socket.getOutputStream()); // 2.傳送介面名、方法名、引數型別、引數值 //方法所在類名介面名 outputStream.writeUTF(serviceInterface.getName()); //方法的名字 outputStream.writeUTF(method.getName()); //方法的入參型別 outputStream.writeObject(method.getParameterTypes()); //方法入參的值 outputStream.writeObject(args); outputStream.flush(); // 3.接受伺服器傳回的結果 inputStream = new ObjectInputStream(socket.getInputStream()); /*接受伺服器的輸出*/ System.out.println(serviceInterface+" remote exec success!"); return inputStream.readObject(); }finally { if(socket!=null) socket.close(); if(outputStream!=null) outputStream.close(); if(inputStream!=null) inputStream.close(); } } } /*----------------以下和動態獲得服務提供者有關------------------------------*/ private static Random r = new Random(); /*獲得遠端服務的地址*/ private static InetSocketAddress getService(String serviceName) throws Exception { //獲得服務提供者的地址列表 List<InetSocketAddress> serviceVoList = getServiceList(serviceName); InetSocketAddress addr = serviceVoList.get(r.nextInt(serviceVoList.size())); System.out.println("本次選擇了伺服器:"+addr); return addr; } /*獲得服務提供者的地址*/ private static List<InetSocketAddress> getServiceList(String serviceName) throws Exception { Socket socket = null; ObjectOutputStream output = null; ObjectInputStream input = null; try{ socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1",9999)); output = new ObjectOutputStream(socket.getOutputStream()); //需要獲得服務提供者 output.writeBoolean(true); //告訴註冊中心服務名 output.writeUTF(serviceName); output.flush(); input = new ObjectInputStream(socket.getInputStream()); Set<RegisterServiceVo> result = (Set<RegisterServiceVo>)input.readObject(); List<InetSocketAddress> services = new ArrayList<>(); for(RegisterServiceVo serviceVo : result){ String host = serviceVo.getHost();//獲得服務提供者的IP int port = serviceVo.getPort();//獲得服務提供者的埠號 InetSocketAddress serviceAddr = new InetSocketAddress(host,port); services.add(serviceAddr); } System.out.println("獲得服務["+serviceName +"]提供者的地址列表["+services+"],準備呼叫."); return services; }finally{ if (socket!=null) socket.close(); if (output!=null) output.close(); if (input!=null) input.close(); } } }
-
取服務時,如果有多個,做負載均衡?
用隨機數
-
高併發RPC解決方案
-
基於TCP的RPC實現
- Dubbo:
阿里巴巴公司開源的一個高效能優秀的服務框架,使得應用可通過高效能的RPC實現服務的輸出和輸入功能,可以和Spring框架無縫整合。
- Dubbo:
-
Provider: 暴露服務的服務提供方。
-
Consumer: 呼叫遠端服務的服務消費方。
-
Registry: 服務註冊與發現的註冊中心。
-
Monitor: 統計服務的呼叫次調和呼叫時間的監控中心。
-
Container: 服務執行容器。