1. 程式人生 > 其它 >netty jar包_Java進階:Netty實現RPC的程式碼

netty jar包_Java進階:Netty實現RPC的程式碼

技術標籤:netty jar包

一、RPC是什麼

RPC,全稱為Remote Procedure Call,即遠端過程呼叫。它允許像呼叫本地服務一樣呼叫遠端服務。

個人感覺,與http類似,都需要本地給遠端伺服器發報文,獲取返回資訊,因此記錄下兩者的區別。

RPC與http區別:

http://www.ccutu.com/244407.html

RPC可以基於TCP協議,也可以基於HTTP協議;

RPC主要用於公司內部的服務呼叫,效能消耗低,傳輸效率高,服務治理方便。HTTP主要用於對外的異構環境,瀏覽器介面呼叫,APP介面呼叫,第三方介面呼叫等。

個人理解,RPC在公司內部的分散式系統中比直接用http方式具有優勢,網路傳輸效率高,具有額外的適合分散式的一些功能(如包含負載均衡策略等),所以分散式系統內部會使用RPC。

二、Netty是什麼

Netty 是一個非同步事件驅動的網路應用程式框架,用於快速開發可維護的高效能協議伺服器和客戶端。

Netty用到了NIO(同步非阻塞)。

Netty、Servlet都屬於Web框架:https://blog.csdn.net/dengkane/article/details/84720822

因此記錄下Netty與Tomcat(Servlet)的區別:https://www.cnblogs.com/pangguoming/p/9353536.html

Servlet是基於Http協議的;Netty可以通過程式設計自定義各種協議,因為netty能夠通過程式設計自己來編碼/解碼位元組流。

個人感覺,Netty既用到了非同步非阻塞,也用到了同步非阻塞。為了避免搞混,在下方簡要記錄一下。

NIO基本概念:

1、緩衝區Buffer

Buffer是一個物件,它包含了一些要寫入的或者要讀出的資料。

緩衝區Buffer本質上是一塊可以寫入資料,然後可以從中讀取資料的記憶體。這塊記憶體被包裝成NIO Buffer物件,並提供了一組方法,用來方便的訪問該塊記憶體。

資料是從通道讀入緩衝區,從緩衝區寫入到通道中的。

2、通道Channel

Channel是一個通道,可以通過它進行資料的讀取和寫入。

資料在channel中可以進行雙向的流通,通道可以用於讀、寫、或同時進行讀寫。

3、多路複用器 Selector

selector會不斷輪詢註冊在其上的Channel,如果某個Channel上有新的Tcp接入,或者有發生讀寫事件,這個Channel就會處於就緒狀態,可以被Selector輪詢出來,然後通過selectedKey獲取就緒的Channel集合,以便進行後續的IO操作。

一個多路複用器可以同時輪詢多個Channel。

Netty用到了NIO,個人理解:

首先,NIO模型中,每個客戶端Socket請求伺服器時,不是直接與伺服器建立一個連結,而是與Selector建立一個連結,這樣就減小了伺服器的壓力。(Netty中實現了Selector)

然後,Selector會輪詢與客戶端的連結,如果有請求,就轉發給伺服器,並將伺服器返回的結果轉發給客戶端。

Netty可以有多個Selector,每個Selector與伺服器建立一個連結即可,減小了伺服器的壓力。

所以Netty用到了NIO。

Netty的所有IO操作都是非同步非阻塞的,個人理解:

首先,IO操作非同步是說,客戶端或Netty會對將資料寫入Channel,或從Channel讀取資料,這是非同步的,進行IO後不會立刻得到結果,而是實際進行IO的部件在完成後,通過狀態、通知和回撥來通知呼叫者。

非同步的好處是不會造成阻塞,在高併發情形下會更穩定和更高的吞吐量。

三、Netty實現RPC的程式碼

通過上方的分析發現,RPC協議適合用於分散式系統;Netty是NIO網路應用程式框架,可以開發客戶端程式與伺服器程式。

所以這兩個組合起來,就能開發分散式系統的客戶端程式與伺服器程式了。

程式碼樣例分3部分:

570bee0acbc360963ee49176b8cb6846.png

*使用下方的程式碼時,import對應的目錄結構需要自己調整下。

rpc-common:其中定義了一些客戶端與伺服器端都會用到的JavaBean類與介面類。

要實現RPC,最重要的就是要定義一個客戶端與伺服器公用的介面類;

客戶端呼叫這個介面類中的方法,看起來是直接獲得了處理結果,實際上是給伺服器傳送請求、伺服器處理後返回的結果;

伺服器則需要實現這個介面類,編寫實際執行的方法,等待客戶端請求。

rpc-consumer:這個是rpc客戶端的程式碼。

rpc-provider:這個是rpc伺服器的程式碼。

1.rpc-common部分

(1)IUserService.java,這個是客戶端與伺服器都會用到的介面類。

public interface IUserService {    public String sayHello(String smg);}

(2)RpcRequest.java,這個是自定義了一個Request物件,這個物件中包含客戶端要呼叫的方法的名稱、引數等資訊,客戶端傳送這個物件給伺服器,伺服器解析這個物件。

public class RpcRequest{    //本次請求的ID,可以自定義一個    private String requestId;    //客戶端準備呼叫服務端的類名    private String className;    //對應類中的方法名    private String methodName;    //呼叫這個方法時,要傳遞的引數型別的陣列,按順序    private Class>[] parameterTypes;    //呼叫這個方法時,要傳遞的引數陣列,按順序    private Object[] parameters;    public String getRequestId() {        return requestId;    }    public void setRequestId(String requestId) {        this.requestId = requestId;    }    public String getClassName() {        return className;    }    public void setClassName(String className) {        this.className = className;    }    public String getMethodName() {        return methodName;    }    public void setMethodName(String methodName) {        this.methodName = methodName;    }    public Class>[] getParameterTypes() {        return parameterTypes;    }    public void setParameterTypes(Class>[] parameterTypes) {        this.parameterTypes = parameterTypes;    }    public Object[] getParameters() {        return parameters;    }    public void setParameters(Object[] parameters) {        this.parameters = parameters;    }}

(3)Serializer.java,這個類是自定義的序列化/反序列化類,用來將java物件(RpcRequest物件)序列化和反序列化,然後就能在網路上傳輸了(也就是轉為位元組陣列,用物件流傳輸)。

import java.io.IOException;public interface Serializer {    //將java物件轉換為二進位制,介面方法    //傳入物件,傳出位元組陣列    byte[] serialize(Object object) throws IOException;    //將二進位制轉換成java物件,介面方法    //傳入位元組陣列與類的型別,返回該型別的物件    T deserialize(Class clazz, byte[] bytes) throws IOException;}

(4)JSONSerializer.java,這個類是Serializer.java的實現類,採用了json方式進行序列化/反序列化。

import com.alibaba.fastjson.JSON;public class JSONSerializer implements Serializer{    public byte[] serialize(Object object) {        return JSON.toJSONBytes(object);    }    public T deserialize(Class clazz, byte[] bytes) {        return JSON.parseObject(bytes, clazz);    }}

(5)pom.xml,這個是rpc-common用到的依賴。同時要注意下groupId與artifactId,這個在後續的rpc-consumer與rpc-provider中會用到。

<groupId>com.testgroupId>    <artifactId>rpc-commonartifactId>    <version>1.0-SNAPSHOTversion>   <dependencies>       <dependency>           <groupId>io.nettygroupId>           <artifactId>netty-allartifactId>           <version>4.1.16.Finalversion>       dependency>       <dependency>           <groupId>com.alibabagroupId>           <artifactId>fastjsonartifactId>           <version>1.2.41version>       dependency>   dependencies>

2.rpc-consumer部分(rpc客戶端)

(1)pom.xml,用到的依賴,其中用到了rpc-common的jar包。

<groupId>org.examplegroupId>    <artifactId>rpc-consumerartifactId>    <version>1.0-SNAPSHOTversion>    <dependencies>        <dependency>            <groupId>com.testgroupId>            <artifactId>rpc-commonartifactId>            <version>1.0-SNAPSHOTversion>        dependency>        <dependency>            <groupId>com.alibabagroupId>            <artifactId>fastjsonartifactId>            <version>1.2.41version>        dependency>    dependencies>

(2)ConsumerBoot.java,客戶端的啟動類。

import com.test.client.RPCConsumer;import com.test.service.IUserService;public class ConsumerBoot {    //這個欄位的意思是,要呼叫UserService類中的sayHello方法    private static final String PROVIDER_NAME = "UserService#sayHello#";    public static void main(String[] args) throws InterruptedException {        //1.建立代理物件        IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDER_NAME);        //2.迴圈給伺服器寫資料        while (true){            //呼叫這個方法後,實際上就給伺服器發請求並獲取返回結果了,不過看起來就像呼叫本地方法一樣。這就是rpc。            String result = service.sayHello("are you ok !!");            System.out.println(result);            Thread.sleep(2000);        }    }}

(3)RPCConsumer.java,客戶端消費者類,有建立代理物件等一系列功能。

import com.test.bean.RpcRequest;import com.test.encoder.RpcEncoder;import com.test.handler.UserClientHandler;import com.test.serializer.JSONSerializer;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * 客戶端消費者類 */public class RPCConsumer {    //自定義的請求id,從0開始自增    private static int requestId = 0;    //1.建立一個執行緒池物件  -- 它要處理我們自定義事件    private static ExecutorService executorService =            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());    //2.宣告一個自定義事件處理器  UserClientHandler    private static UserClientHandler userClientHandler;    //3.編寫方法,初始化客戶端  ( 建立連線池  建立bootStrap  設定bootstrap  連線伺服器)    public static void initClient() throws InterruptedException {        //1) 初始化UserClientHandler        userClientHandler  = new UserClientHandler();        //2)建立連線池物件        EventLoopGroup group = new NioEventLoopGroup();        //3)建立客戶端的引導物件        Bootstrap bootstrap =  new Bootstrap();        //4)配置啟動引導物件        bootstrap.group(group)                //設定通道為NIO                .channel(NioSocketChannel.class)                //設定請求協議為TCP                .option(ChannelOption.TCP_NODELAY,true)                //監聽channel 並初始化                .handler(new ChannelInitializer<SocketChannel>() {                    protected void initChannel(SocketChannel socketChannel) throws Exception {                        //獲取ChannelPipeline                        ChannelPipeline pipeline = socketChannel.pipeline();                        //設定如何編碼,使用自定義json序列化類編碼                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));                        //設定如何解碼                        pipeline.addLast(new StringDecoder());                        //新增自定義事件處理器                        pipeline.addLast(userClientHandler);                    }                });        //5)連線服務端        bootstrap.connect("127.0.0.1",8999).sync();    }    //4.編寫一個方法,使用JDK的動態代理建立物件    // serviceClass 介面型別,根據哪個介面生成子類代理物件;   providerParam :  "UserService#sayHello#"    public static Object createProxy(Class> serviceClass, final String providerParam){        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),                new Class[]{serviceClass}, new InvocationHandler() {                    public Object invoke(Object o, Method method, Object[] objects) throws Throwable {                        //封裝好rpcRequest                        RpcRequest rpcRequest = getRpcRequest(providerParam, objects[0]);                        //1)初始化客戶端client                        if(userClientHandler == null){                            initClient();                        }                        //2)給UserClientHandler 設定param引數                        userClientHandler.setParam(rpcRequest);                        //3).使用執行緒池,開啟一個執行緒處理處理call() 寫操作,並返回結果                        Object result = executorService.submit(userClientHandler).get();                        //4)return 結果                        return result;                    }                });    }    //封裝rpcRequest物件的方法    public static RpcRequest getRpcRequest(String providerParam, Object object){        RpcRequest rpcRequest = new RpcRequest();        rpcRequest.setRequestId(String.valueOf(requestId));        requestId++;        rpcRequest.setClassName(providerParam.split("#")[0]);        rpcRequest.setMethodName(providerParam.split("#")[1]);        rpcRequest.setParameterTypes(new Class[]{String.class});        //are you ok !!        rpcRequest.setParameters(new Object[]{object});        return rpcRequest;    }}

(4)RpcEncoder.java,上方設定pipeline如何編碼時用到了。

import com.test.serializer.Serializer;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;//注意繼承的是MessageToByteEncoder類public class RpcEncoder extends MessageToByteEncoder {    private Class> clazz;    private Serializer serializer;    public RpcEncoder(Class> clazz, Serializer serializer) {        this.clazz = clazz;        this.serializer = serializer;    }    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {        if (clazz != null && clazz.isInstance(msg)) {            byte[] bytes = serializer.serialize(msg);            byteBuf.writeBytes(bytes);        }    }}

(5)UserClientHandler.java,自定義事件處理器。

import com.test.bean.RpcRequest;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.concurrent.Callable;/** *  自定義事件處理器 */public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {    //1.定義成員變數    private ChannelHandlerContext context; //事件處理器上下文物件 (儲存handler資訊,寫操作)    private String result; // 記錄伺服器返回的資料    private RpcRequest rpcRequest; //將要傳送給伺服器的資料(自定義的類物件)    //2.實現channelActive  客戶端和伺服器連線時,該方法就自動執行    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //初始化ChannelHandlerContext        this.context = ctx;    }    //3.實現channelRead 當我們讀到伺服器資料,該方法自動執行    @Override    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //將讀到的伺服器的資料msg ,設定為成員變數的值        result = msg.toString();        //喚醒執行緒,執行緒才會繼續執行下方的return result;        notify();    }    //4.將客戶端的數寫到伺服器    public synchronized Object call() throws Exception {        //context給伺服器寫資料        context.writeAndFlush(rpcRequest);        //使用wait阻塞自己,等待伺服器返回資料        wait();        //在上方notify()後,才繼續執行,return伺服器返回的資訊        return result;    }    //5.設定引數的方法    public void setParam(RpcRequest param){        this.rpcRequest = param;    }}

3.rpc-provider部分(rpc伺服器)

(1)pom.xml,依賴資訊,其中用到了rpc-common的jar包;伺服器寫成了springboot專案,因此還有springboot相關依賴。

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0modelVersion>  <parent>    <groupId>org.springframework.bootgroupId>    <artifactId>spring-boot-starter-parentartifactId>    <version>2.3.3.RELEASEversion>    <relativePath/>   parent>  <groupId>com.testgroupId>  <artifactId>rpc-providerartifactId>  <version>1.0-SNAPSHOTversion>  <name>myspringbootname>  <description>Demo project for Spring Bootdescription>  <properties>    <java.version>8java.version>    <mybatis-springboot-starter.version>1.3.0mybatis-springboot-starter.version>  properties>  <dependencies>    <dependency>      <groupId>org.springframework.bootgroupId>      <artifactId>spring-boot-starter-webartifactId>    dependency>    <dependency>      <groupId>org.springframework.bootgroupId>      <artifactId>spring-boot-starter-testartifactId>      <scope>testscope>      <exclusions>        <exclusion>          <groupId>org.junit.vintagegroupId>          <artifactId>junit-vintage-engineartifactId>        exclusion>      exclusions>    dependency>    <dependency>      <groupId>com.testgroupId>      <artifactId>rpc-commonartifactId>      <version>1.0-SNAPSHOTversion>    dependency>    <dependency>      <groupId>com.alibabagroupId>      <artifactId>fastjsonartifactId>      <version>1.2.41version>    dependency>    dependencies>  <build>    <plugins>      <plugin>        <groupId>org.springframework.bootgroupId>        <artifactId>spring-boot-maven-pluginartifactId>      plugin>    plugins>  build>project>

(2)ServerBootstrap.java,rpc服務端的啟動類。

import com.test.service.IUserService;import com.test.service.UserServiceImpl;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class ServerBootstrap {  @Autowired    @Qualifier("UserService")  IUserService userServiceImpl;  public static void main(String[] args) throws InterruptedException {    //啟動spring容器    SpringApplication.run(ServerBootstrap.class, args);    //啟動伺服器    UserServiceImpl.startServer("127.0.0.1",8999);  }}

(3)UserServiceImpl.java,實現類,客戶端相當於實際呼叫了這個類中的方法。(客戶端給伺服器發請求,伺服器呼叫方法,然後將結果返回給客戶端。)

import com.test.bean.RpcRequest;import com.test.decoder.RpcDecoder;import com.test.handler.UserServiceHandler;import com.test.serializer.JSONSerializer;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;import org.springframework.stereotype.Service;@Service("UserService")public class UserServiceImpl implements IUserService,ApplicationContextAware {    //spring容器物件,通過實現ApplicationContextAware獲取到    private static ApplicationContext applicationContext;    //將來客戶端要遠端呼叫的方法    public String sayHello(String msg) {        System.out.println("客戶端發來資料:"+msg);        return "伺服器返回資料 : "+msg+"[success]";    }    //建立一個方法啟動伺服器    public static void startServer(String ip , int port) throws InterruptedException {        //1.建立兩個執行緒池物件        NioEventLoopGroup  bossGroup = new NioEventLoopGroup();        NioEventLoopGroup  workGroup = new NioEventLoopGroup();        //2.建立服務端的啟動引導物件        ServerBootstrap serverBootstrap = new ServerBootstrap();        //3.配置啟動引導物件        serverBootstrap.group(bossGroup,workGroup)                //設定通道為NIO                .channel(NioServerSocketChannel.class)                //建立監聽channel                .childHandler(new ChannelInitializer() {                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {                        //獲取管道物件                        ChannelPipeline pipeline = nioSocketChannel.pipeline();                        //給管道物件pipeLine 設定編碼方式                        pipeline.addLast(new StringEncoder());                        //給管道物件pipeLine 設定解碼方式,使用自定義json方式解碼                        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer()));                        //把自定義的一個ChannelHander新增到通道中                        pipeline.addLast(new UserServiceHandler(applicationContext));                    }                });        //4.繫結埠        serverBootstrap.bind(8999).sync();    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;    }}

(4)UserServiceHandler.java,自定義業務處理器,包含處理從客戶端收到的資訊的方法,以及返回給客戶端資訊的方法。

import com.test.bean.RpcRequest;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import org.springframework.context.ApplicationContext;import java.lang.reflect.Method;/** * 自定義的業務處理器 */public class UserServiceHandler extends ChannelInboundHandlerAdapter {    //spring容器物件    private ApplicationContext applicationContext;    //構造方法,給spring容器物件賦值    public UserServiceHandler(ApplicationContext applicationContext) {        this.applicationContext = applicationContext;    }    //當客戶端發來資料時,該方法會被呼叫    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //將客戶端發來的物件強轉為RpcRequest。(約定好的,本來就是RpcRequest)        RpcRequest rpcRequest = (RpcRequest)msg;        //從spring容器中獲得要呼叫的類,名字是UserService,實際的型別是UserServiceImpl        Object bean = applicationContext.getBean(rpcRequest.getClassName());        //用反射獲得UserServiceImpl物件        //com.test.UserServiceImpl        //要注意這個類的路徑        Class c = Class.forName(bean.getClass().getPackage().getName()+ "." + rpcRequest.getClassName() +"Impl");        //然後用反射獲得這個類的方法        //sayHello        Method method = c.getDeclaredMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());        System.out.println("當前請求號:"+rpcRequest.getRequestId());        //注意method中傳入的是spring容器中取出的bean,而不是c.newInstance()        //are you ok!!        Object invoke = method.invoke(bean, rpcRequest.getParameters());        //把呼叫實現類的方法獲得的結果寫到客戶端        ctx.writeAndFlush(invoke);    }}

(5)RpcDecoder.java,自定義json格式解碼用到的類。

import com.test.serializer.Serializer;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;//注意解碼繼承的是ByteToMessageDecoderpublic class RpcDecoder extends ByteToMessageDecoder {    private Class> clazz;    private Serializer serializer;    public RpcDecoder(Class> clazz, Serializer serializer) {        this.clazz = clazz;        this.serializer = serializer;    }    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {        if (clazz != null && serializer != null) {            byte[] bytes = new byte[byteBuf.readableBytes()];            byteBuf.readBytes(bytes);            //如果不想改變byteBuf的讀寫位置,可以用getBytes            //int readerIndex = byteBuf.readerIndex();            //byteBuf.getBytes(readerIndex, bytes);            //這個就是RpcRequest物件            Object obj = serializer.deserialize(clazz, bytes);            //注意這裡只添加了一個RpcRequest物件            //如果list中添加了多個元素,則handler中的channelRead(ChannelHandlerContext ctx, Object msg)方法會被多次呼叫,按順序一次給msg傳入一個元素            list.add(obj);        }    }}

以上,Netty實現RPC的程式碼樣例就完成了。