netty jar包_Java進階:Netty實現RPC的程式碼
技術標籤:netty jar包
一、RPC是什麼
RPC,全稱為Remote Procedure Call,即遠端過程呼叫。它允許像呼叫本地服務一樣呼叫遠端服務。
個人感覺,與http類似,都需要本地給遠端伺服器發報文,獲取返回資訊,因此記錄下兩者的區別。
RPC與http區別:
http://www.ccutu.com/244407.html
RPC可以基於TCP協議,也可以基於HTTP協議;
RPC主要用於公司內部的服務呼叫,效能消耗低,傳輸效率高,服務治理方便。HTTP主要用於對外的異構環境,瀏覽器介面呼叫,APP介面呼叫,第三方介面呼叫等。
個人理解,RPC在公司內部的分散式系統中比直接用http方式具有優勢,網路傳輸效率高,具有額外的適合分散式的一些功能(如包含負載均衡策略等),所以分散式系統內部會使用RPC。
二、Netty是什麼
Netty 是一個非同步事件驅動的網路應用程式框架,用於快速開發可維護的高效能協議伺服器和客戶端。
Netty用到了NIO(同步非阻塞)。
Netty、Servlet都屬於Web框架:https://blog.csdn.net/dengkane/article/details/84720822
因此記錄下Netty與Tomcat(Servlet)的區別:https://www.cnblogs.com/pangguoming/p/9353536.html
Servlet是基於Http協議的;Netty可以通過程式設計自定義各種協議,因為netty能夠通過程式設計自己來編碼/解碼位元組流。
個人感覺,Netty既用到了非同步非阻塞,也用到了同步非阻塞。為了避免搞混,在下方簡要記錄一下。
NIO基本概念:
1、緩衝區Buffer
Buffer是一個物件,它包含了一些要寫入的或者要讀出的資料。
緩衝區Buffer本質上是一塊可以寫入資料,然後可以從中讀取資料的記憶體。這塊記憶體被包裝成NIO Buffer物件,並提供了一組方法,用來方便的訪問該塊記憶體。
資料是從通道讀入緩衝區,從緩衝區寫入到通道中的。
2、通道Channel
Channel是一個通道,可以通過它進行資料的讀取和寫入。
資料在channel中可以進行雙向的流通,通道可以用於讀、寫、或同時進行讀寫。
3、多路複用器 Selector
selector會不斷輪詢註冊在其上的Channel,如果某個Channel上有新的Tcp接入,或者有發生讀寫事件,這個Channel就會處於就緒狀態,可以被Selector輪詢出來,然後通過selectedKey獲取就緒的Channel集合,以便進行後續的IO操作。
一個多路複用器可以同時輪詢多個Channel。
Netty用到了NIO,個人理解:
首先,NIO模型中,每個客戶端Socket請求伺服器時,不是直接與伺服器建立一個連結,而是與Selector建立一個連結,這樣就減小了伺服器的壓力。(Netty中實現了Selector)
然後,Selector會輪詢與客戶端的連結,如果有請求,就轉發給伺服器,並將伺服器返回的結果轉發給客戶端。
Netty可以有多個Selector,每個Selector與伺服器建立一個連結即可,減小了伺服器的壓力。
所以Netty用到了NIO。
Netty的所有IO操作都是非同步非阻塞的,個人理解:
首先,IO操作非同步是說,客戶端或Netty會對將資料寫入Channel,或從Channel讀取資料,這是非同步的,進行IO後不會立刻得到結果,而是實際進行IO的部件在完成後,通過狀態、通知和回撥來通知呼叫者。
非同步的好處是不會造成阻塞,在高併發情形下會更穩定和更高的吞吐量。
三、Netty實現RPC的程式碼
通過上方的分析發現,RPC協議適合用於分散式系統;Netty是NIO網路應用程式框架,可以開發客戶端程式與伺服器程式。
所以這兩個組合起來,就能開發分散式系統的客戶端程式與伺服器程式了。
程式碼樣例分3部分:
*使用下方的程式碼時,import對應的目錄結構需要自己調整下。
rpc-common:其中定義了一些客戶端與伺服器端都會用到的JavaBean類與介面類。
要實現RPC,最重要的就是要定義一個客戶端與伺服器公用的介面類;
客戶端呼叫這個介面類中的方法,看起來是直接獲得了處理結果,實際上是給伺服器傳送請求、伺服器處理後返回的結果;
伺服器則需要實現這個介面類,編寫實際執行的方法,等待客戶端請求。
rpc-consumer:這個是rpc客戶端的程式碼。
rpc-provider:這個是rpc伺服器的程式碼。
1.rpc-common部分
(1)IUserService.java,這個是客戶端與伺服器都會用到的介面類。
public interface IUserService { public String sayHello(String smg);}
(2)RpcRequest.java,這個是自定義了一個Request物件,這個物件中包含客戶端要呼叫的方法的名稱、引數等資訊,客戶端傳送這個物件給伺服器,伺服器解析這個物件。
public class RpcRequest{ //本次請求的ID,可以自定義一個 private String requestId; //客戶端準備呼叫服務端的類名 private String className; //對應類中的方法名 private String methodName; //呼叫這個方法時,要傳遞的引數型別的陣列,按順序 private Class>[] parameterTypes; //呼叫這個方法時,要傳遞的引數陣列,按順序 private Object[] parameters; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class>[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class>[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; }}
(3)Serializer.java,這個類是自定義的序列化/反序列化類,用來將java物件(RpcRequest物件)序列化和反序列化,然後就能在網路上傳輸了(也就是轉為位元組陣列,用物件流傳輸)。
import java.io.IOException;public interface Serializer { //將java物件轉換為二進位制,介面方法 //傳入物件,傳出位元組陣列 byte[] serialize(Object object) throws IOException; //將二進位制轉換成java物件,介面方法 //傳入位元組陣列與類的型別,返回該型別的物件 T deserialize(Class clazz, byte[] bytes) throws IOException;}
(4)JSONSerializer.java,這個類是Serializer.java的實現類,採用了json方式進行序列化/反序列化。
import com.alibaba.fastjson.JSON;public class JSONSerializer implements Serializer{ public byte[] serialize(Object object) { return JSON.toJSONBytes(object); } public T deserialize(Class clazz, byte[] bytes) { return JSON.parseObject(bytes, clazz); }}
(5)pom.xml,這個是rpc-common用到的依賴。同時要注意下groupId與artifactId,這個在後續的rpc-consumer與rpc-provider中會用到。
<groupId>com.testgroupId> <artifactId>rpc-commonartifactId> <version>1.0-SNAPSHOTversion> <dependencies> <dependency> <groupId>io.nettygroupId> <artifactId>netty-allartifactId> <version>4.1.16.Finalversion> dependency> <dependency> <groupId>com.alibabagroupId> <artifactId>fastjsonartifactId> <version>1.2.41version> dependency> dependencies>
2.rpc-consumer部分(rpc客戶端)
(1)pom.xml,用到的依賴,其中用到了rpc-common的jar包。
<groupId>org.examplegroupId> <artifactId>rpc-consumerartifactId> <version>1.0-SNAPSHOTversion> <dependencies> <dependency> <groupId>com.testgroupId> <artifactId>rpc-commonartifactId> <version>1.0-SNAPSHOTversion> dependency> <dependency> <groupId>com.alibabagroupId> <artifactId>fastjsonartifactId> <version>1.2.41version> dependency> dependencies>
(2)ConsumerBoot.java,客戶端的啟動類。
import com.test.client.RPCConsumer;import com.test.service.IUserService;public class ConsumerBoot { //這個欄位的意思是,要呼叫UserService類中的sayHello方法 private static final String PROVIDER_NAME = "UserService#sayHello#"; public static void main(String[] args) throws InterruptedException { //1.建立代理物件 IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDER_NAME); //2.迴圈給伺服器寫資料 while (true){ //呼叫這個方法後,實際上就給伺服器發請求並獲取返回結果了,不過看起來就像呼叫本地方法一樣。這就是rpc。 String result = service.sayHello("are you ok !!"); System.out.println(result); Thread.sleep(2000); } }}
(3)RPCConsumer.java,客戶端消費者類,有建立代理物件等一系列功能。
import com.test.bean.RpcRequest;import com.test.encoder.RpcEncoder;import com.test.handler.UserClientHandler;import com.test.serializer.JSONSerializer;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 客戶端消費者類 */public class RPCConsumer { //自定義的請求id,從0開始自增 private static int requestId = 0; //1.建立一個執行緒池物件 -- 它要處理我們自定義事件 private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //2.宣告一個自定義事件處理器 UserClientHandler private static UserClientHandler userClientHandler; //3.編寫方法,初始化客戶端 ( 建立連線池 建立bootStrap 設定bootstrap 連線伺服器) public static void initClient() throws InterruptedException { //1) 初始化UserClientHandler userClientHandler = new UserClientHandler(); //2)建立連線池物件 EventLoopGroup group = new NioEventLoopGroup(); //3)建立客戶端的引導物件 Bootstrap bootstrap = new Bootstrap(); //4)配置啟動引導物件 bootstrap.group(group) //設定通道為NIO .channel(NioSocketChannel.class) //設定請求協議為TCP .option(ChannelOption.TCP_NODELAY,true) //監聽channel 並初始化 .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { //獲取ChannelPipeline ChannelPipeline pipeline = socketChannel.pipeline(); //設定如何編碼,使用自定義json序列化類編碼 pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer())); //設定如何解碼 pipeline.addLast(new StringDecoder()); //新增自定義事件處理器 pipeline.addLast(userClientHandler); } }); //5)連線服務端 bootstrap.connect("127.0.0.1",8999).sync(); } //4.編寫一個方法,使用JDK的動態代理建立物件 // serviceClass 介面型別,根據哪個介面生成子類代理物件; providerParam : "UserService#sayHello#" public static Object createProxy(Class> serviceClass, final String providerParam){ return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() { public Object invoke(Object o, Method method, Object[] objects) throws Throwable { //封裝好rpcRequest RpcRequest rpcRequest = getRpcRequest(providerParam, objects[0]); //1)初始化客戶端client if(userClientHandler == null){ initClient(); } //2)給UserClientHandler 設定param引數 userClientHandler.setParam(rpcRequest); //3).使用執行緒池,開啟一個執行緒處理處理call() 寫操作,並返回結果 Object result = executorService.submit(userClientHandler).get(); //4)return 結果 return result; } }); } //封裝rpcRequest物件的方法 public static RpcRequest getRpcRequest(String providerParam, Object object){ RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setRequestId(String.valueOf(requestId)); requestId++; rpcRequest.setClassName(providerParam.split("#")[0]); rpcRequest.setMethodName(providerParam.split("#")[1]); rpcRequest.setParameterTypes(new Class[]{String.class}); //are you ok !! rpcRequest.setParameters(new Object[]{object}); return rpcRequest; }}
(4)RpcEncoder.java,上方設定pipeline如何編碼時用到了。
import com.test.serializer.Serializer;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;//注意繼承的是MessageToByteEncoder類public class RpcEncoder extends MessageToByteEncoder { private Class> clazz; private Serializer serializer; public RpcEncoder(Class> clazz, Serializer serializer) { this.clazz = clazz; this.serializer = serializer; } @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception { if (clazz != null && clazz.isInstance(msg)) { byte[] bytes = serializer.serialize(msg); byteBuf.writeBytes(bytes); } }}
(5)UserClientHandler.java,自定義事件處理器。
import com.test.bean.RpcRequest;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.concurrent.Callable;/** * 自定義事件處理器 */public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable { //1.定義成員變數 private ChannelHandlerContext context; //事件處理器上下文物件 (儲存handler資訊,寫操作) private String result; // 記錄伺服器返回的資料 private RpcRequest rpcRequest; //將要傳送給伺服器的資料(自定義的類物件) //2.實現channelActive 客戶端和伺服器連線時,該方法就自動執行 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //初始化ChannelHandlerContext this.context = ctx; } //3.實現channelRead 當我們讀到伺服器資料,該方法自動執行 @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //將讀到的伺服器的資料msg ,設定為成員變數的值 result = msg.toString(); //喚醒執行緒,執行緒才會繼續執行下方的return result; notify(); } //4.將客戶端的數寫到伺服器 public synchronized Object call() throws Exception { //context給伺服器寫資料 context.writeAndFlush(rpcRequest); //使用wait阻塞自己,等待伺服器返回資料 wait(); //在上方notify()後,才繼續執行,return伺服器返回的資訊 return result; } //5.設定引數的方法 public void setParam(RpcRequest param){ this.rpcRequest = param; }}
3.rpc-provider部分(rpc伺服器)
(1)pom.xml,依賴資訊,其中用到了rpc-common的jar包;伺服器寫成了springboot專案,因此還有springboot相關依賴。
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0modelVersion> <parent> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-parentartifactId> <version>2.3.3.RELEASEversion> <relativePath/> parent> <groupId>com.testgroupId> <artifactId>rpc-providerartifactId> <version>1.0-SNAPSHOTversion> <name>myspringbootname> <description>Demo project for Spring Bootdescription> <properties> <java.version>8java.version> <mybatis-springboot-starter.version>1.3.0mybatis-springboot-starter.version> properties> <dependencies> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-webartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-testartifactId> <scope>testscope> <exclusions> <exclusion> <groupId>org.junit.vintagegroupId> <artifactId>junit-vintage-engineartifactId> exclusion> exclusions> dependency> <dependency> <groupId>com.testgroupId> <artifactId>rpc-commonartifactId> <version>1.0-SNAPSHOTversion> dependency> <dependency> <groupId>com.alibabagroupId> <artifactId>fastjsonartifactId> <version>1.2.41version> dependency> dependencies> <build> <plugins> <plugin> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-maven-pluginartifactId> plugin> plugins> build>project>
(2)ServerBootstrap.java,rpc服務端的啟動類。
import com.test.service.IUserService;import com.test.service.UserServiceImpl;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class ServerBootstrap { @Autowired @Qualifier("UserService") IUserService userServiceImpl; public static void main(String[] args) throws InterruptedException { //啟動spring容器 SpringApplication.run(ServerBootstrap.class, args); //啟動伺服器 UserServiceImpl.startServer("127.0.0.1",8999); }}
(3)UserServiceImpl.java,實現類,客戶端相當於實際呼叫了這個類中的方法。(客戶端給伺服器發請求,伺服器呼叫方法,然後將結果返回給客戶端。)
import com.test.bean.RpcRequest;import com.test.decoder.RpcDecoder;import com.test.handler.UserServiceHandler;import com.test.serializer.JSONSerializer;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Service;@Service("UserService")public class UserServiceImpl implements IUserService,ApplicationContextAware { //spring容器物件,通過實現ApplicationContextAware獲取到 private static ApplicationContext applicationContext; //將來客戶端要遠端呼叫的方法 public String sayHello(String msg) { System.out.println("客戶端發來資料:"+msg); return "伺服器返回資料 : "+msg+"[success]"; } //建立一個方法啟動伺服器 public static void startServer(String ip , int port) throws InterruptedException { //1.建立兩個執行緒池物件 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); //2.建立服務端的啟動引導物件 ServerBootstrap serverBootstrap = new ServerBootstrap(); //3.配置啟動引導物件 serverBootstrap.group(bossGroup,workGroup) //設定通道為NIO .channel(NioServerSocketChannel.class) //建立監聽channel .childHandler(new ChannelInitializer() { protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { //獲取管道物件 ChannelPipeline pipeline = nioSocketChannel.pipeline(); //給管道物件pipeLine 設定編碼方式 pipeline.addLast(new StringEncoder()); //給管道物件pipeLine 設定解碼方式,使用自定義json方式解碼 pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer())); //把自定義的一個ChannelHander新增到通道中 pipeline.addLast(new UserServiceHandler(applicationContext)); } }); //4.繫結埠 serverBootstrap.bind(8999).sync(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }}
(4)UserServiceHandler.java,自定義業務處理器,包含處理從客戶端收到的資訊的方法,以及返回給客戶端資訊的方法。
import com.test.bean.RpcRequest;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import org.springframework.context.ApplicationContext;import java.lang.reflect.Method;/** * 自定義的業務處理器 */public class UserServiceHandler extends ChannelInboundHandlerAdapter { //spring容器物件 private ApplicationContext applicationContext; //構造方法,給spring容器物件賦值 public UserServiceHandler(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } //當客戶端發來資料時,該方法會被呼叫 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //將客戶端發來的物件強轉為RpcRequest。(約定好的,本來就是RpcRequest) RpcRequest rpcRequest = (RpcRequest)msg; //從spring容器中獲得要呼叫的類,名字是UserService,實際的型別是UserServiceImpl Object bean = applicationContext.getBean(rpcRequest.getClassName()); //用反射獲得UserServiceImpl物件 //com.test.UserServiceImpl //要注意這個類的路徑 Class c = Class.forName(bean.getClass().getPackage().getName()+ "." + rpcRequest.getClassName() +"Impl"); //然後用反射獲得這個類的方法 //sayHello Method method = c.getDeclaredMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()); System.out.println("當前請求號:"+rpcRequest.getRequestId()); //注意method中傳入的是spring容器中取出的bean,而不是c.newInstance() //are you ok!! Object invoke = method.invoke(bean, rpcRequest.getParameters()); //把呼叫實現類的方法獲得的結果寫到客戶端 ctx.writeAndFlush(invoke); }}
(5)RpcDecoder.java,自定義json格式解碼用到的類。
import com.test.serializer.Serializer;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;//注意解碼繼承的是ByteToMessageDecoderpublic class RpcDecoder extends ByteToMessageDecoder { private Class> clazz; private Serializer serializer; public RpcDecoder(Class> clazz, Serializer serializer) { this.clazz = clazz; this.serializer = serializer; } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { if (clazz != null && serializer != null) { byte[] bytes = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(bytes); //如果不想改變byteBuf的讀寫位置,可以用getBytes //int readerIndex = byteBuf.readerIndex(); //byteBuf.getBytes(readerIndex, bytes); //這個就是RpcRequest物件 Object obj = serializer.deserialize(clazz, bytes); //注意這裡只添加了一個RpcRequest物件 //如果list中添加了多個元素,則handler中的channelRead(ChannelHandlerContext ctx, Object msg)方法會被多次呼叫,按順序一次給msg傳入一個元素 list.add(obj); } }}
以上,Netty實現RPC的程式碼樣例就完成了。