基於Netty的分散式 RPC 框架
阿新 • • 發佈:2019-02-16
轉載自:http://blog.csdn.net/z69183787/article/details/52700274
http://blog.csdn.net/z69183787/article/details/52680941
採用Zookeeper、Netty和spring實現了一個輕量級的分散式RPC框架,這個RPC框架可以算是一個簡易版的dubbo。框架雖小,但是麻雀雖小,五臟俱全。
使用瞭如下技術選型:
Spring:依賴注入框架。
Netty:它使 NIO 程式設計更加容易,遮蔽了 Java 底層的 NIO 細節。
Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 檔案。
ZooKeeper:提供服務註冊與發現功能,開發分散式系統的必備選擇。
下面只是些關鍵部分的程式碼示例,具體請參考原作者的git: https://github.com/luxiaoxun/NettyRpc 或則 轉載的原文。
1.使用註解標註要釋出的服務:
2. 定義服務介面和實現:
3. RpcRequest與RpcResponse如下:
4. 實現 RPC 伺服器:
5. RpcDecoder提供 RPC 解碼,只需擴充套件 Netty 的ByteToMessageDecoder抽象類的decode方法即可,RpcEncoder提供 RPC 編碼,只需擴充套件 Netty 的MessageToByteEncoder抽象類的encode方法即可,程式碼如下:
6. 在RpcHandler中將處理 RPC 請求,擴充套件了Netty的SimpleChannelInboundHandler抽象類:
7. 客戶端呼叫服務,使用代理模式呼叫服務:
8. RpcClient類實現 RPC 客戶端:
9.測試:
以上的demo還只是個雛形,還存在一些效能問題需要改進,僅供學習參考之用。
http://blog.csdn.net/z69183787/article/details/52680941
採用Zookeeper、Netty和spring實現了一個輕量級的分散式RPC框架,這個RPC框架可以算是一個簡易版的dubbo。框架雖小,但是麻雀雖小,五臟俱全。
使用瞭如下技術選型:
Spring:依賴注入框架。
Netty:它使 NIO 程式設計更加容易,遮蔽了 Java 底層的 NIO 細節。
Protostuff:它基於 Protobuf 序列化框架,面向 POJO,無需編寫 .proto 檔案。
ZooKeeper:提供服務註冊與發現功能,開發分散式系統的必備選擇。
下面只是些關鍵部分的程式碼示例,具體請參考原作者的git:
1.使用註解標註要釋出的服務:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
Class<?> value();
}
使用RpcService註解定義在服務介面的實現類上,需要對該實現類指定遠端介面的class,因為實現類可能會實現多個介面,一定要告訴框架哪個才是遠端介面。2. 定義服務介面和實現:
public interface HelloService { …… }
@RpcService(HelloService.class)
public class HelloServiceImpl implements HelloService {
……
}
3. RpcRequest與RpcResponse如下:
public class RpcRequest { private String requestId; private String className; private String methodName; private Class<?>[] parameterTypes; private Object[] parameters; // getter/setter... }
public class RpcResponse {
private String requestId;
private Throwable error;
private Object result;
// getter/setter...
}
4. 實現 RPC 伺服器:
public class RpcServer implements ApplicationContextAware, InitializingBean {
private String serverAddress;
private ServiceRegistry serviceRegistry;
private Map<String, Object> handlerMap = new HashMap<>(); // 存放介面名與服務物件之間的對映關係
public RpcServer(String serverAddress) {
this.serverAddress = serverAddress;
}
public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {
this.serverAddress = serverAddress;
this.serviceRegistry = serviceRegistry;
}
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); // 獲取所有帶有 RpcService 註解的 Spring Bean
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
@Override
public void afterPropertiesSet() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new RpcDecoder(RpcRequest.class)) // 將 RPC 請求進行解碼(為了處理請求)
.addLast(new RpcEncoder(RpcResponse.class)) // 將 RPC 響應進行編碼(為了返回響應)
.addLast(new RpcHandler(handlerMap)); // 處理 RPC 請求
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ChannelFuture future = bootstrap.bind(host, port).sync();
if (serviceRegistry != null) {
serviceRegistry.register(serverAddress); // 註冊服務地址
}
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
這裡的serverAddress是”ip:port”,ServiceRegistry是使用 ZooKeeper實現服務註冊類,將serverAddress在資料節點上。5. RpcDecoder提供 RPC 解碼,只需擴充套件 Netty 的ByteToMessageDecoder抽象類的decode方法即可,RpcEncoder提供 RPC 編碼,只需擴充套件 Netty 的MessageToByteEncoder抽象類的encode方法即可,程式碼如下:
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public RpcDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (dataLength < 0) {
ctx.close();
}
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = SerializationUtil.deserialize(data, genericClass);
out.add(obj);
}
}
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public RpcEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = SerializationUtil.serialize(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
SerializationUtil工具類是使用Protostuff來實現序列化,程式碼略過。6. 在RpcHandler中將處理 RPC 請求,擴充套件了Netty的SimpleChannelInboundHandler抽象類:
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {
private final Map<String, Object> handlerMap;
public RpcHandler(Map<String, Object> handlerMap) {
this.handlerMap = handlerMap;
}
@Override
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
RpcResponse response = new RpcResponse();
response.setRequestId(request.getRequestId());
try {
Object result = handle(request);
response.setResult(result);
} catch (Throwable t) {
response.setError(t);
}
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private Object handle(RpcRequest request) throws Throwable {
String className = request.getClassName();
Object serviceBean = handlerMap.get(className);
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
/*Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, parameters);*/
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
為了避免使用 Java 反射帶來的效能問題,使用了CGLib 提供的反射API,即上面用到的FastClass與FastMethod。7. 客戶端呼叫服務,使用代理模式呼叫服務:
public class RpcProxy {
private String serverAddress;
private ServiceDiscovery serviceDiscovery;
public RpcProxy(String serverAddress) {
this.serverAddress = serverAddress;
}
public RpcProxy(ServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass) {
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest request = new RpcRequest(); // 建立並初始化 RPC 請求
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
if (serviceDiscovery != null) {
serverAddress = serviceDiscovery.discover(); // 發現服務
}
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
RpcClient client = new RpcClient(host, port); // 初始化 RPC 客戶端
RpcResponse response = client.send(request); // 通過 RPC 客戶端傳送 RPC 請求並獲取 RPC 響應
if (response.isError()) {
throw response.getError();
} else {
return response.getResult();
}
}
}
);
}
}
ServiceDiscovery是Zookeeper實現的服務發現,取出已經註冊過的”ip:port”,然後隨機返回一個”ip:port”。8. RpcClient類實現 RPC 客戶端:
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
private String host;
private int port;
private RpcResponse response;
private final Object obj = new Object();
public RpcClient(String host, int port) {
this.host = host;
this.port = port;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
this.response = response;
synchronized (obj) {
obj.notifyAll(); // 收到響應,喚醒執行緒
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public RpcResponse send(RpcRequest request) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new RpcEncoder(RpcRequest.class)) // 將 RPC 請求進行編碼(為了傳送請求)
.addLast(new RpcDecoder(RpcResponse.class)) // 將 RPC 響應進行解碼(為了處理響應)
.addLast(RpcClient.this); // 使用 RpcClient 傳送 RPC 請求
}
})
.option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().writeAndFlush(request).sync();
synchronized (obj) {
obj.wait(); // 未收到響應,使執行緒等待
}
if (response != null) {
future.channel().closeFuture().sync();
}
return response;
} finally {
group.shutdownGracefully();
}
}
}
這裡每次呼叫的send時候才去和服務端建立連線,使用的是短連線,這種短連線在高併發時會有連線數問題,也會影響效能。,使用了obj的wait和notifyAll來等待Response返回,會出現“假死等待”的情況:一個Request傳送出去後,在obj.wait()呼叫之前可能Response就返回了,這時候在channelRead0裡已經拿到了Response並且obj.notifyAll()已經在obj.wait()之前呼叫了,這時候send後再obj.wait()就出現了假死等待,客戶端就一直等待在這裡。應該使用CountDownLatch來解決這個問題。9.測試:
RpcProxy rpcProxy = new RpcProxy(serviceDiscovery);
HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello("World");
以上的demo還只是個雛形,還存在一些效能問題需要改進,僅供學習參考之用。