Dubbo教程-04-生擼一個RPC
為什麼要生擼?
這個問題就好比
我自己已經有老婆了
為什麼還要自己做飯 ?
好像這個例子 舉的不是很恰當
主要是 為了 理解下 RPC 的一個具體程式設計模型
和他實現的一些細節
其實就是一個程式設計模型的 理解 和 實踐 過程
基於netty框架
我們之前 學過netty框架的一個 程式設計模型
server client
基於事件驅動的模式。
上一個例子我們 把 資料傳遞到伺服器
然後 伺服器給我們返回資料
中間通過 netty的網路連線 實現打通
那麼我們就會想 是否可以 把 傳遞過去的資料
變成一個抽象,
然後 伺服器端的 資料獲取 及處理 程式設計 具體的 實現類
客戶端的 模擬呼叫
變成面向介面 ?
簡單畫了個圖片給大家理解下
說白了就是 客戶端 現在 要利用 service 介面
動態生成代理物件
而動態代理的實現細節中加入 和 網路互動
把 具體的程式碼實現放到了遠端伺服器。
遠端伺服器把結果通過網路返回給客戶端
客戶端再交由代理物件返回
於是我們感知到的就是
真的返回
看起來很你牛逼的樣子
不錯確實很牛逼
程式碼給我看看
Service.java
package cn.bywind.rpc.v_one;
public interface Service {
String sayHelloWithName(String name);
}
這個就是一個服務介面咯
沒啥可說的
他接收一個引數
然後返回一個字串。
ServiceProvider.java
package cn.bywind.rpc.v_one;
public class ServiceProvider implements Service {
@Override
public String sayHelloWithName(String name) {
return "hello "+name;
}
}
這個是 service 的具體實現
沒啥可說的
就是拿到了 入參 然後 稍微處理下
加了個hello 然後返回了
我們這裡只是模擬下,不要以為我不會寫程式碼啊
ServiceServer.java
package cn.bywind.rpc.v_one;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ServiceServer {
private int port = 0;
public ServiceServer(int port) {
this.port = port;
}
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
public void run(){
System.out.println("running on port :"+port);
try {
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new ServiceProviderHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", port).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
public static void main(String[] args) {
ServiceServer serviceServer = new ServiceServer(9999);
serviceServer.run();
}
}
這個就是一個程式設計模型
netty作為網路伺服器
這個類就是一個 server端的程式碼
萬年不變的哦
都是這麼寫的
大家可以在網上找到很多類似程式碼
不過我這個是 參照 netty官網 規範寫的啊
大家可以借鑑下
server端 我們定有兩個 NioEventLoopGroup
一個是boss 一個是 worker
boss 接收請求
worker 處理請求
別的沒啥可說的了 照著寫
ServiceProviderHandler.java
package cn.bywind.rpc.v_one;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class ServiceProviderHandler extends ChannelHandlerAdapter {
private static final Service SERVICE = new ServiceProvider();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String s = msg.toString();
System.out.println("get str from client:"+s);
ctx.writeAndFlush(SERVICE.sayHelloWithName(s));
}
}
這個類正式 如他所說的 handler 或者是 adapter
其實 netty也看到了這個 地方的不妥
所以他們現在都叫 adapter了
你可以去看最新的 netty程式碼 之前 這部分 是 handler
現在都叫 adapter
了
這個地方就是 讓 具體實現 接收網路傳輸資料
然後 自己 邏輯處理下
然後 通過網路傳出 結果資料
這個地方 我們稱為 adapter
ServiceClient.java
package cn.bywind.rpc.v_one;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServiceClient {
private ServiceConsumerHandler handler ;
private static ExecutorService executor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public Object createProxy(final Class<?> serviceClass){
Object o = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(handler == null){
startClient();
}
handler.setParams(args[0].toString());
return executor.submit(handler).get();
}
});
return o;
}
public void startClient(){
handler = new ServiceConsumerHandler();
try {
NioEventLoopGroup worker = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(handler);
}
});
bootstrap.connect("127.0.0.1", 9999).sync();
}catch (Exception e){
e.printStackTrace();
}
}
}
這個類就是一個netty的客戶端
給他一個 server
的IP 埠 讓他建立連線
連線以後 他就需要 動態代理了
在動態代理的過程中
我們呼叫 網路通訊
然後得到 server
給我們的結果
然後返回給 代理物件
ServiceConsumerHandler.java
package cn.bywind.rpc.v_one;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.Callable;
public class ServiceConsumerHandler extends ChannelHandlerAdapter implements Callable{
private ChannelHandlerContext context;
private String result;
private String params;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(params);
wait();
return result;
}
public String getParams() {
return params;
}
public void setParams(String params) {
this.params = params;
}
}
跟上面的那個 ServiceProviderHandler
類似
都是處理 netty 網路資料
他主要實現兩個功能
1. 拿到入參 向netty中寫資料
2. 拿到服務端返回值 返回給程式
我們讓這個類實現 callable 介面
可以被執行緒池呼叫
因為我們會模擬多個客戶端的情形
我們需要我們的 資料獲取 和 資料寫入
是執行緒安全的
這塊的程式碼大家可以 詳細看下
ServiceConsumer.java
package cn.bywind.rpc.v_one;
public class ServiceConsumer {
public static void main(String[] args) throws Exception{
ServiceClient client = new ServiceClient();
Service proxy = (Service) client.createProxy(Service.class);
int i = 0;
for (;;){
Thread.sleep(1000);
String result = proxy.sayHelloWithName("bywind"+i);
System.out.println("from rpc server:"+result);
i++;
}
}
}
這個類就是一個客戶端吧
具體的也沒啥可說的
執行我們的程式
首先我們執行我們的伺服器端程式碼
接下來我們執行我們的客戶端程式碼
多執行緒的哦, 並且我們啟動兩個客戶端
我們正好看下 伺服器是否會返回錯亂,或者活 客戶端是否會錯亂
準確的說是 客戶端是否會錯亂
因為我們的客戶端 實際處理程式碼是 一個執行緒模型的
我們加了 執行緒安全的模型哦
好的現在我們看下效果
獲取服務端的資料完全沒有問題
緊接著我麼啟動第二個客戶端
我們發現他的序號也是從 0 開始
這就實現了不同執行緒中間互補干擾
以上就是我們 1.0 版本的 一個 遠端RPC呼叫
彷彿不是很過癮
這種只能給一個 service
服務了
我們需要的是程式的通用性
可以適配更多
並且在傳遞 引數 和返回引數的時候
我們不想只有 String
了
我們需要 Object
這樣 就更完美了
這也就是 框架的意義所在的
從1.0 升級到 2.0
還是先貼程式碼吧
GoodByeService.java
package cn.bywind.rpc.v_two;
public interface GoodByeService {
String sayGoodbye(Person person);
}
GoodByeServiceImpl.java
package cn.bywind.rpc.v_two;
public class GoodByeServiceImpl implements GoodByeService {
@Override
public String sayGoodbye(Person person) {
return "GoodBye :"+person;
}
}
HelloService.java
package cn.bywind.rpc.v_two;
public interface HelloService {
String sayHelloWithName(String name);
}
HelloServiceImpl.java
package cn.bywind.rpc.v_two;
public class HelloServiceImpl implements HelloService {
@Override
public String sayHelloWithName(String name) {
return "hello "+name;
}
}
Person.java
package cn.bywind.rpc.v_two;
public class Person {
private String name;
private Integer age;
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
RPCDecoder.java
package cn.bywind.rpc.v_two;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class RPCDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public RPCDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
final int length = in.readableBytes();
final byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
Object obj = SerializationUtil.deserialize(bytes, genericClass);
out.add(obj);
}
}
RPCEncoder.java
package cn.bywind.rpc.v_two;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class RPCEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public RPCEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = SerializationUtil.serialize(in);
out.writeBytes(data);
}
}
}
RpcRequest.java
package cn.bywind.rpc.v_two;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Arrays;
public class RpcRequest implements Serializable {
private String className;
private String methodName;
private Class<?>[] parameterTypes;
private Object [] args;
private String requestId;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public void setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
@Override
public String toString() {
return "RpcRequest{" +
"className='" + className + '\'' +
", methodName='" + methodName + '\'' +
", parameterTypes=" + Arrays.toString(parameterTypes) +
", args=" + Arrays.toString(args) +
", requestId='" + requestId + '\'' +
'}';
}
}
RpcResponse.java
package cn.bywind.rpc.v_two;
import java.io.Serializable;
public class RpcResponse implements Serializable {
private String requestId;
private Object result;
public RpcResponse(){
}
public RpcResponse(String requestId, Object result) {
this.requestId = requestId;
this.result = result;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
SerializationUtil.java
package cn.bywind.rpc.v_two;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SerializationUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();
private static Objenesis objenesis = new ObjenesisStd(true);
private SerializationUtil() {
}
@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> cls) {
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {
schema = RuntimeSchema.createFrom(cls);
if (schema != null) {
cachedSchema.put(cls, schema);
}
}
return schema;
}
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T obj) {
Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
public static <T> T deserialize(byte[] data, Class<T> cls) {
try {
T message = (T) objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, message, schema);
return message;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
ServiceClient.java
package cn.bywind.rpc.v_two;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServiceClient {
private ServiceConsumerHandler handler ;
private static ExecutorService executor = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public Object createProxy(final Class<?> serviceClass){
Object o = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(handler == null){
startClient();
}
RpcRequest request = new RpcRequest(); // 建立並初始化 RPC 請求
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setArgs(args);
handler.setParams(request);
return executor.submit(handler).get();
}
});
return o;
}
public void startClient(){
handler = new ServiceConsumerHandler();
try {
NioEventLoopGroup worker = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker);
bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new RPCEncoder(RpcRequest.class));
p.addLast(new RPCDecoder(RpcResponse.class));
p.addLast(handler);
}
});
bootstrap.connect("127.0.0.1", 9999).sync();
}catch (Exception e){
e.printStackTrace();
}
}
}
ServiceConsumer.java
package cn.bywind.rpc.v_two;
public class ServiceConsumer {
public static void main(String[] args) throws Exception{
ServiceClient client = new ServiceClient();
HelloService helloService = (HelloService) client.createProxy(HelloService.class);
String bywind = helloService.sayHelloWithName("bywind");
System.out.println("helloService :"+bywind);
GoodByeService goodByeService = (GoodByeService) client.createProxy(GoodByeService.class);
String bywind1 = goodByeService.sayGoodbye(new Person("bywind", 28));
System.out.println("goodByeService : "+bywind1);
}
}
ServiceConsumerHandler.java
package cn.bywind.rpc.v_two;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.Callable;
public class ServiceConsumerHandler extends ChannelHandlerAdapter implements Callable{
private ChannelHandlerContext context;
private RpcResponse rpcResponse;
private Object result;
private RpcRequest params;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
rpcResponse = (RpcResponse) msg;
result = rpcResponse.getResult();
notify();
}
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(params);
wait();
return result;
}
public RpcRequest getParams() {
return params;
}
public void setParams(RpcRequest params) {
this.params = params;
}
}
ServiceProviderHandler.java
package cn.bywind.rpc.v_two;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
public class ServiceProviderHandler extends ChannelHandlerAdapter {
private static final HashMap<String,Object> SERVICE = new HashMap<String, Object>();
static {
SERVICE.put(GoodByeService.class.getName(),new GoodByeServiceImpl());
SERVICE.put(HelloService.class.getName(),new HelloServiceImpl());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("get obj from client:"+msg);
RpcRequest request = (RpcRequest) msg;
Object result = handle(request);
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
response.setResult(result);
ctx.writeAndFlush(response);
}
public Object handle(RpcRequest request){
String className = request.getClassName();
Object object = SERVICE.get(className);
Class<?>[] parameterTypes = request.getParameterTypes();
String methodName = request.getMethodName();
Object[] args = request.getArgs();
Class<?> targetClass = object.getClass();
FastClass fastClass = FastClass.create(targetClass);
FastMethod method = fastClass.getMethod(methodName, parameterTypes);
Object invoke = null;
try {
invoke = method.invoke(object, args);
} catch (InvocationTargetException e) {
e.printStackTrace();
}
return invoke;
}
}
ServiceServer.java
package cn.bywind.rpc.v_two;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class ServiceServer {
private int port = 0;
public ServiceServer(int port) {
this.port = port;
}
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
public void run(){
System.out.println("running on port :"+port);
try {
bootstrap.group(boss,worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new RPCDecoder(RpcRequest.class));
p.addLast(new RPCEncoder(RpcResponse.class));
p.addLast(new ServiceProviderHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind("127.0.0.1", port).sync();
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
public static void main(String[] args) {
ServiceServer serviceServer = new ServiceServer(9999);
serviceServer.run();
}
}
我們來按下第二版的 執行效果吧
還是一樣的 執行 服務端
然後
執行我們的 client
我們看下結果
客戶端結果 輸出
服務端接收引數 輸出:
大家如果想要自己實際執行下程式碼的化
可以去看我的github
https://github.com/ibywind/dubbo-learn
希望大家可以先看一遍 程式碼 然後把 動作記下來
自己去實際 生擼 一個 RPC框架
我學到了什麼
說句實在話
這個東西網上例子很多的
我之所以 想自己嘗試著寫下程式碼
主要為了 防止自己 被 頻繁的 拷貝 貼上
變成老年痴呆
首先分析 原理
這個東西 就是個 代理模式
另外 熟悉了 netty的程式設計模型
其實做起來還是很簡單的.
這個程式碼我上傳到 github
https://github.com/ibywind/dubbo-learn
大家可以下載下來繼續學習和指正哦