1. 程式人生 > 實用技巧 >Java+Nettty自定義RPC框架

Java+Nettty自定義RPC框架

本次利用Java+netty實現自定義rpc框架,共分為三個工程,公共模組+服務提供者+服務消費者:
 

rpc-common工程
 
pom.xml
 
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.rpc.common</groupId>
    <artifactId>rpc-common</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <dependencies>
 
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.16.Final</version>
        </dependency>
        <dependency>
 
            <groupId>com.alibaba</groupId>
 
            <artifactId>fastjson</artifactId>
 
            <version>1.2.41</version>
 
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.0</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
 
 
RpcDecoder.java
 
 
package com.rpc.decoder; import java.util.List; import com.rpc.util.SerializationUtil; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; /** * * @author linxu * */ public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) {
return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.toClass(genericClass, data); out.add(obj); } } RpcEncoder.java package com.rpc.decoder; import com.rpc.util.SerializationUtil; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * * @author linxu * */ @SuppressWarnings("rawtypes") public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (genericClass.isInstance(msg)) { byte[] data = SerializationUtil.toByte(msg); out.writeInt(data.length); out.writeBytes(data); } } } SrpcRequest.java package com.rpc.message; import java.io.Serializable; import java.util.Arrays; /** * * @author linxu * */ public class SrpcRequest implements Serializable{ private static final long serialVersionUID = 6132853628325824727L; // 請求Id private String requestId; // 遠端呼叫介面名稱 private String interfaceName; //遠端呼叫方法名稱 private String methodName; // 引數型別 private Class<?>[] parameterTypes; // 引數值 private Object[] parameters; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getInterfaceName() { return interfaceName; } public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } @Override public String toString() { return "SrpcRequest [requestId=" + requestId + ", interfaceName=" + interfaceName + ", methodName=" + methodName + ", parameterTypes=" + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters) + "]"; } } SrpcResponse.java package com.rpc.message; import java.io.Serializable; /** * * @author linxu * */ public class SrpcResponse implements Serializable{ private static final long serialVersionUID = -5934073769679010930L; // 請求的Id private String requestId; // 異常 private Throwable error; // 響應 private Object result; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Throwable getError() { return error; } public void setError(Throwable error) { this.error = error; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } @Override public String toString() { return "SrpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result + "]"; } } SerializationUtil.java package com.rpc.util; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; /** * * @author 86136 * */ public class SerializationUtil { /** * 序列化 * * @param t * @return */ public static <T> byte[] toByte(T t) { ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = null; try { o = new ObjectOutputStream(b); o.writeObject(t); return b.toByteArray(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (b != null) { try { b.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (o != null) { try { o.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } return null; } /** * 反序列 * * @param clazz * @param buffer * @return * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T toClass(Class<T> clazz, byte[] buffer) throws Exception { ByteArrayInputStream i = new ByteArrayInputStream(buffer); ObjectInputStream o = null; try { o = new ObjectInputStream(i); return (T) o.readObject(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { if (i != null) { i.close(); } if (o != null) { o.close(); } } return null; } } DeptService.java package com.user.service; public interface DeptService { public String selectDept(String d); } UserService.java package com.user.service; public interface UserService { String sayHello(String word); } rpc-consumer工程 ClientBootstrap.java package com.rpc; import com.user.service.DeptService; import com.user.service.UserService; /** * 呼叫測試 * * @author linxu * */ public class ClientBootstrap { public static void main(String[] args) throws InterruptedException { test1(); test2(); } public static void test1() { RpcConsumer consumer = new RpcConsumer(); UserService service = (UserService) consumer.createProxy(UserService.class); System.out.println(service.sayHello("are you ok 001 ?")); } public static void test2() { RpcConsumer consumer = new RpcConsumer(); DeptService service = (DeptService) consumer.createProxy(DeptService.class); System.out.println(service.selectDept("are you ok 002 ?")); } } ClientHandler.java package com.rpc; import java.util.concurrent.Callable; import com.rpc.message.SrpcRequest; import com.rpc.message.SrpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable<Object> { private ChannelHandlerContext context; private SrpcResponse result; private SrpcRequest para; @Override public void channelActive(ChannelHandlerContext ctx) { context = ctx; } @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) { result = (SrpcResponse)msg; notify(); } @Override public synchronized Object call() throws InterruptedException { context.writeAndFlush(para); wait(); return result; } void setPara(SrpcRequest para) { this.para = para; } } RpcConsumer.java package com.rpc; import java.lang.reflect.Proxy; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.commons.lang3.StringUtils; import com.rpc.decoder.RpcDecoder; import com.rpc.decoder.RpcEncoder; import com.rpc.message.SrpcRequest; import com.rpc.message.SrpcResponse; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class RpcConsumer { private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static ClientHandler client; /** * 代理物件去執行了一個socket連線請求, */ public Object createProxy(final Class<?> interfaceClass) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { interfaceClass }, (proxy, method, arguments) -> { if (client == null) { initClient(); } //請求封裝 SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); client.setPara(request); //請求結果 SrpcResponse response = (SrpcResponse) executor.submit(client).get(); if (response == null || !StringUtils.equals(request.getRequestId(), response.getRequestId())) { return null; } if (response.getError() != null) { throw response.getError(); } return response.getResult(); }); } private static void initClient() { client = new ClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new RpcEncoder(SrpcRequest.class)); p.addLast(new RpcDecoder(SrpcResponse.class)); p.addLast(client); } }); try { b.connect("localhost", 8888).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rpc.consumer</groupId> <artifactId>rpc-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.rpc.common</groupId> <artifactId>rpc-common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build> </project> rpc-provider工程 Bootstrap.java package com.rpc.bootstrap; import java.util.HashMap; import java.util.Map; import com.rpc.decoder.RpcDecoder; import com.rpc.decoder.RpcEncoder; import com.rpc.message.SrpcRequest; import com.rpc.message.SrpcResponse; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @SuppressWarnings({ }) public class Bootstrap { // 伺服器介面容器 @SuppressWarnings({ }) public static final Map<String, Object> serviceRegistry = new HashMap<String, Object>(); public static void startServer(String hostName, int port) { try { NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new RpcDecoder(SrpcRequest.class)); p.addLast(new RpcEncoder(SrpcResponse.class)); p.addLast(new RpcServerHandler()); } }); bootstrap.bind(hostName, port).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { //服務註冊 registryService(); //netty啟動 startServer("localhost", 8888); } public static void registryService() { final Map<String, String> serviceInterfaceConfiguration = ServiceRegistry.registry(); if (serviceInterfaceConfiguration != null && !serviceInterfaceConfiguration.isEmpty()) { serviceInterfaceConfiguration.forEach((k, v) -> { try { @SuppressWarnings("deprecation") Object object = Class.forName(v).newInstance(); serviceRegistry.put(k, object); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InstantiationException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IllegalAccessException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } } } RpcServerHandler.java package com.rpc.bootstrap; import java.lang.reflect.Method; import com.rpc.message.SrpcRequest; import com.rpc.message.SrpcResponse; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class RpcServerHandler extends SimpleChannelInboundHandler<SrpcRequest> { @Override protected void channelRead0(ChannelHandlerContext ctx, SrpcRequest msg) throws Exception { SrpcResponse response = new SrpcResponse(); response.setRequestId(msg.getRequestId()); try { response.setResult(handle(msg)); } catch (Exception e) { response.setError(e); e.printStackTrace(); } ctx.writeAndFlush(response); } /** * 執行服務介面方法 * * @param request * @return * @throws Exception */ private Object handle(SrpcRequest request) throws Exception { Object service = Bootstrap.serviceRegistry.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); return method.invoke(service, request.getParameters()); } } ServiceRegistry.java package com.rpc.bootstrap; import java.util.HashMap; import java.util.Map; public class ServiceRegistry { public static Map<String,String>serviceInterfaceConfiguration=new HashMap<String, String>(); public static void put(String k,String v) { serviceInterfaceConfiguration.put(k, v); } public static Map<String,String> registry() { //服務介面註冊 put("com.user.service.UserService", "com.user.serviceimp.UserServiceImpl"); put("com.user.service.DeptService", "com.user.serviceimp.DeptServiceImpl"); return serviceInterfaceConfiguration; } } DeptServiceImpl.java package com.user.serviceimp; import com.user.service.DeptService; public class DeptServiceImpl implements DeptService{ @Override public String selectDept(String d) { return d+":successful"; } } UserServiceImpl.java package com.user.serviceimp; import com.user.service.UserService; public class UserServiceImpl implements UserService{ @Override public String sayHello(String word) { // TODO Auto-generated method stub System.err.println("呼叫成功"+word); return "呼叫成功"+word; } } pom,xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rpc.provider</groupId> <artifactId>rpc-provider</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.rpc.common</groupId> <artifactId>rpc-common</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build> </project>