基於Netty的高性能JAVA的RPC框架
今年7月份左右報名參加了阿裏巴巴組織的高性能中間件挑戰賽,這次比賽不像以往的比賽,是從一個工程的視角來比賽的。
這個比賽有兩個賽題,第一題是實現一個RPC框架,第二道題是實現一個Mom消息中間件。
RPC題目如下
一個簡單的RPC框架
RPC(Remote Procedure Call )——遠程過程調用,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通信程序之間攜帶信息數據。在OSI網絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分布式多程序在內的應用程序更加容易。
框架——讓編程人員便捷地使用框架所提供的功能,由於RPC的特性,聚焦於應用的分布式服務化開發,所以成為一個對開發人員無感知的接口代理,顯然是RPC框架優秀的設計。
1.要成為框架:對於框架的使用者,隱藏RPC實現。
2.網絡模塊可以自己編寫,如果要使用IO框架,要求使用netty-4.0.23.Final。
3.支持異步調用,提供future、callback的能力。
4.能夠傳輸基本類型、自定義業務類型、異常類型(要在客戶端拋出)。
5.要處理超時場景,服務端處理時間較長時,客戶端在指定時間內跳出本次調用。
6.提供RPC上下文,客戶端可以透傳數據給服務端。
7.提供Hook,讓開發人員進行RPC層面的AOP。
註:為了降低第一題的難度,RPC框架不需要註冊中心,客戶端識別-DSIP的JVM參數來獲取服務端IP。
衡量標準
滿足所有要求。 性能測試。
參賽者必須以com.alibaba.middleware.race.rpc.api.impl.RpcConsumerImpl為全類名,繼承com.alibaba.middleware.race.rpc.api.RpcConsumer,並覆寫所有的public方法。
參賽者必須以com.alibaba.middleware.race.rpc.api.impl.RpcProviderImpl為全類名,繼承com.alibaba.middleware.race.rpc.api.RpcProvider,並覆寫所有的public方法。
三方庫裏的代碼起到提示的作用,可以作為參考,不要在最終的pom中依賴。
所以最終參賽者需要打出一個rpc-api的jar包,供測試工程調用。 (註意,參考完rpc-api的示例後,請從pom依賴中將其刪除,避免依賴沖突)
測試Demo工程請參考Taocode SVN上的代碼。
RPC的實現
題目中推薦的網絡框架使用Netty4來實現,這個RPC框架中需要實現的有
- RPC客戶端
- RPC服務端
RPC客戶端的實現
RPC客戶端和RPC服務器端需要一個相同的接口類,RPC客戶端通過一個代理類來調用RPC服務器端的函數
RpcConsumerImpl的實現
......
package com.alibaba.middleware.race.rpc.api.impl;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.middleware.race.rpc.aop.ConsumerHook;
import com.alibaba.middleware.race.rpc.api.RpcConsumer;
import com.alibaba.middleware.race.rpc.async.ResponseCallbackListener;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.netty.RpcConnection;
import com.alibaba.middleware.race.rpc.netty.RpcNettyConnection;
import com.alibaba.middleware.race.rpc.tool.Tool;
public class RpcConsumerImpl extends RpcConsumer implements InvocationHandler {
private static AtomicLong callTimes = new AtomicLong(0L);
private RpcConnection connection;
private List<RpcConnection> connection_list;
private Map<String,ResponseCallbackListener> asyncMethods;
private Class<?> interfaceClass;
private String version;
private int timeout;
private ConsumerHook hook;
public Class<?> getInterfaceClass() {
return interfaceClass;
}
public String getVersion() {
return version;
}
public int getTimeout() {
this.connection.setTimeOut(timeout);
return timeout;
}
public ConsumerHook getHook() {
return hook;
}
RpcConnection select()
{
//Random rd=new Random(System.currentTimeMillis());
int d=(int) (callTimes.getAndIncrement()%(connection_list.size()+1));
if(d==0)
return connection;
else
{
return connection_list.get(d-1);
}
}
public RpcConsumerImpl()
{
//String ip=System.getProperty("SIP");
String ip="127.0.0.1";
this.asyncMethods=new HashMap<String,ResponseCallbackListener>();
this.connection=new RpcNettyConnection(ip,8888);
this.connection.connect();
connection_list=new ArrayList<RpcConnection>();
int num=Runtime.getRuntime().availableProcessors()/3 -2;
for (int i = 0; i < num; i++) {
connection_list.add(new RpcNettyConnection(ip, 8888));
}
for (RpcConnection conn:connection_list)
{
conn.connect();
}
}
public void destroy() throws Throwable {
if (null != connection) {
connection.close();
}
}
@SuppressWarnings("unchecked")
public <T> T proxy(Class<T> interfaceClass) throws Throwable {
if (!interfaceClass.isInterface()) {
throw new IllegalArgumentException(interfaceClass.getName()
+ " is not an interface");
}
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[] { interfaceClass }, this);
}
@Override
public RpcConsumer interfaceClass(Class<?> interfaceClass) {
// TODO Auto-generated method stub
this.interfaceClass=interfaceClass;
return this;
}
@Override
public RpcConsumer version(String version) {
// TODO Auto-generated method stub
this.version=version;
return this;
}
@Override
public RpcConsumer clientTimeout(int clientTimeout) {
// TODO Auto-generated method stub
this.timeout=clientTimeout;
return this;
}
@Override
public RpcConsumer hook(ConsumerHook hook) {
// TODO Auto-generated method stub
this.hook=hook;
return this;
}
@Override
public Object instance() {
// TODO Auto-generated method stub
try {
return proxy(this.interfaceClass);
}
catch (Throwable e)
{
e.printStackTrace();
}
return null;
}
@Override
public void asynCall(String methodName) {
// TODO Auto-generated method stub
asynCall(methodName, null);
}
@Override
public <T extends ResponseCallbackListener> void asynCall(
String methodName, T callbackListener) {
this.asyncMethods.put(methodName, callbackListener);
this.connection.setAsyncMethod(asyncMethods);
for (RpcConnection conn:connection_list)
{
conn.setAsyncMethod(asyncMethods);
}
}
@Override
public void cancelAsyn(String methodName) {
// TODO Auto-generated method stub
this.asyncMethods.remove(methodName);
this.connection.setAsyncMethod(asyncMethods);
for (RpcConnection conn:connection_list)
{
conn.setAsyncMethod(asyncMethods);
}
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
// TODO Auto-generated method stub
List<String> parameterTypes = new LinkedList<String>();
for (Class<?> parameterType : method.getParameterTypes()) {
parameterTypes.add(parameterType.getName());
}
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
if(hook!=null)
hook.before(request);
RpcResponse response = null;
try
{
request.setContext(RpcContext.props);
response = (RpcResponse) select().Send(request,asyncMethods.containsKey(request.getMethodName()));
if(hook!=null)
hook.after(request);
if(!asyncMethods.containsKey(request.getMethodName())&&response.getExption()!=null)
{
Throwable e=(Throwable) Tool.deserialize(response.getExption(),response.getClazz());
throw e.getCause();
}
}
catch (Throwable t)
{
//t.printStackTrace();
//throw new RuntimeException(t);
throw t;
}
finally
{
// if(asyncMethods.containsKey(request.getMethodName())&&asyncMethods.get(request.getMethodName())!=null)
// {
// cancelAsyn(request.getMethodName());
// }
}
if(response==null)
{
return null;
}
else if (response.getErrorMsg() != null)
{
throw response.getErrorMsg();
}
else
{
return response.getAppResponse();
}
}
}
RpcConsumer consumer;
consumer = (RpcConsumer) getConsumerImplClass().newInstance();
consumer.someMethod();123
因為consumer對象是通過代理生成的,所以當consumer調用的時候,就會調用invoke函數,我們就可以把這次本地的函數調用的信息通過網絡發送到RPC服務器然後等待服務器返回的信息後再返回。
服務器實現
RPC服務器主要是在收到RPC客戶端之後解析出RPC調用的接口名,函數名以及參數。
package com.alibaba.middleware.race.rpc.api.impl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import net.sf.cglib.reflect.FastClass;
import net.sf.cglib.reflect.FastMethod;
import com.alibaba.middleware.race.rpc.context.RpcContext;
import com.alibaba.middleware.race.rpc.model.RpcRequest;
import com.alibaba.middleware.race.rpc.model.RpcResponse;
import com.alibaba.middleware.race.rpc.serializer.KryoSerialization;
import com.alibaba.middleware.race.rpc.tool.ByteObjConverter;
import com.alibaba.middleware.race.rpc.tool.ReflectionCache;
import com.alibaba.middleware.race.rpc.tool.Tool;
/**
- 處理服務器收到的RPC請求並返回結果
- @author sei.zz
-
*/
public class RpcRequestHandler extends ChannelInboundHandlerAdapter {//對應每個請求ID和端口好 對應一個RpcContext的Map;
private static Map<String,Map<String,Object>> ThreadLocalMap=new HashMap<String, Map<String,Object>>();
//服務端接口-實現類的映射表
private final Map<String, Object> handlerMap;
KryoSerialization kryo=new KryoSerialization();
public RpcRequestHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;}
@Override
br/>}
@Override
}
@Override
br/>System.out.println("active");
}
@Override
// TODO Auto-generated method stub
System.out.println("disconnected");
}
//更新RpcContext的類容
private void UpdateRpcContext(String host,Map<String,Object> map)
{
if(ThreadLocalMap.containsKey(host))
{
Map<String,Object> local=ThreadLocalMap.get(host);
local.putAll(map);//把客戶端的加進來
ThreadLocalMap.put(host, local);//放回去
for(Map.Entry<String, Object> entry:map.entrySet()){ //更新變量
RpcContext.addProp(entry.getKey(), entry.getValue());
}
}
else
{
ThreadLocalMap.put(host, map);
//把對應線程的Context更新
for(Map.Entry<String, Object> entry:map.entrySet()){
RpcContext.addProp(entry.getKey(), entry.getValue());
}
}}
//用來緩存住需要序列化的結果
private static Object cacheName=null;
private static Object cacheVaule=null;@Override
public void channelRead(
ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest request=(RpcRequest)msg;
String host=ctx.channel().remoteAddress().toString();
//更新上下文
UpdateRpcContext(host,request.getContext());
//TODO 獲取接口名 函數名 參數 找到實現類 反射實現
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try
{
Object result = handle(request);
if(cacheName!=null&&cacheName.equals(result))
{
response.setAppResponse(cacheVaule);
}
else
{
response.setAppResponse(ByteObjConverter.ObjectToByte(result));
cacheName=result;
cacheVaule=ByteObjConverter.ObjectToByte(result);
}
}
catch (Throwable t)
{
//response.setErrorMsg(t);
response.setExption(Tool.serialize(t));
response.setClazz(t.getClass());
}
ctx.writeAndFlush(response);
}/**
- 運行調用的函數返回結果
- @param request
- @return
-
@throws Throwable
*/
private static RpcRequest methodCacheName=null;
private static Object methodCacheValue=null;
private Object handle(RpcRequest request) throws Throwable
{
String className = request.getClassName();Object classimpl = handlerMap.get(className);//通過類名找到實現的類
Class<?> clazz = classimpl.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// Method method = ReflectionCache.getMethod(clazz.getName(),methodName, parameterTypes);
// method.setAccessible(true);
//System.out.println(className+":"+methodName+":"+parameters.length);
if(methodCacheName!=null&&methodCacheName.equals(request))
{
return methodCacheValue;
}
else
{
try
{
methodCacheName=request;
if(methodMap.containsKey(methodName))
{
methodCacheValue= methodMap.get(methodName).invoke(classimpl, parameters);
return methodCacheValue;
}
else
{
FastClass serviceFastClass = FastClass.create(clazz);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
methodMap.put(methodName, serviceFastMethod);
methodCacheValue= serviceFastMethod.invoke(classimpl, parameters);
return methodCacheValue;
}
//return method.invoke(classimpl, parameters);
}
catch (Throwable e)
{
throw e.getCause();
}
}
}
private Map<String,FastMethod> methodMap=new HashMap<String, FastMethod>();
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
//ctx.close();
//cause.printStackTrace();
ctx.close();
}
}
handel函數通過Java的反射機制,找到要調用的接口類然後調用對應函數然後執行,然後返回結果到客戶端,本次RPC調用結束。
RPC主要的實現類在我的github上可以看見,我的這套RPC框架雖說不上完美,但是性能還是挺好的在服務器上測試時TPC有9w+。
主要的優化就是使用Neety4這個框架以及對數據包的處理,數據序列化與反序列化的速度
基於Netty的高性能JAVA的RPC框架