微服務 Dubbo + Zookeeper 原理解析
補充:2018-04-20
值得一說的是:下方的 “透明” 是通過 動態代理對 “負載均衡和容錯”的封裝 。
此圖配合下方案例程式碼可以更好的理解 分散式服務框架-RPC原理。
協議:
說明 :內容為小編個人見解,同時做備忘用
基礎準備 : java Socket , serverSocket , RPC 協議。
(1) 網路通訊資料傳輸靠的就是 IO 流(byte[] 位元組) 。
(2) RPC 協議是指 : 利用tcp 通訊,對byte[] 加上自己的規則, 服務端和客戶端可以通過此規則進行資料的封裝和解析。
dubbo + zookeeper 程式碼演練
服務提供者 :
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tony.test</groupId>
<artifactId >dubbo-provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>dubbo</artifactId>
<version>2.5.3</version>
</dependency >
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
</project>
provider.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 提供方應用資訊,用於計算依賴關係 -->
<dubbo:application name="hello-world-app" />
<!-- 使用zookeeper註冊中心暴露服務地址 -->
<dubbo:registry address="zookeeper://localhost:2181" />
<!-- 用dubbo協議在20880埠暴露服務 -->
<!-- serialization 協議序列化方式,當協議支援多種序列化方式時使用,,預設hessian2
比如:dubbo協議的dubbo,hessian2,java,compactedjava,以及http協議的json等
-->
<dubbo:protocol name="dubbo" port="20880" serialization="fastjson"/>
<!-- 宣告需要暴露的服務介面 -->
<dubbo:service interface="com.tony.test.dubbo.DemoService" ref="demoService" />
<!-- 和本地bean一樣實現服務 -->
<bean id="demoService" class="com.tony.test.dubbo.provider.DemoServiceImpl" />
</beans>
服務介面 :
package com.tony.test.dubbo;
public interface DemoService {
String sayHello(User user, String mark);
}
子類 :
package com.tony.test.dubbo.provider;
import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;
public class DemoServiceImpl implements DemoService {
public String sayHello(User user, String mark) {
System.out.println("進來了----");
return "agui>" + mark + " name>" + user.getName();
}
}
main 入口 :
或者直接啟動tomcat
package com.tony.test.dubbo.provider;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ProviderMain {
// 服務提供者
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "provider.xml" });
context.start();
System.in.read();
}
}
服務消費者
pom.xml 同上 , 介面(interface)同上 。
consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<!-- 消費方應用名,用於計算依賴關係,不是匹配條件,不要與提供方一樣 -->
<dubbo:application name="consumer-of-helloworld-app" />
<!-- 使用multicast廣播註冊中心暴露發現服務地址 -->
<dubbo:registry address="zookeeper://localhost:2181" />
<!-- 生成遠端服務代理,可以和本地bean一樣使用demoService -->
<dubbo:reference id="demoService" interface="com.tony.test.dubbo.DemoService" timeout="30000"/>
</beans>
main 入口
或者啟動tomcat
package com.tony.test.dubbo.consumer;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.tony.test.dubbo.DemoService;
import com.tony.test.dubbo.User;
public class ConsumerMain {
// 服務消費者
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "consumer.xml" });
context.start();
DemoService demoService = (DemoService) context.getBean("demoService");
User user = new User();
user.setName("agui");
String hello = demoService.sayHello(user, "2");
System.out.println("RPC呼叫結果:" + hello);
}
}
以上就是 dubbo + zookeeper 簡單demo 實現。
接下來講解一下實現原理
之前提到過:帶有規則的byte[] 位元組 ,那麼阿里的dubbo 對bute[] 規則 如圖 :
基本結構分為:header , body 兩部分,詳細內容參考上圖。
整體流程 :
1) 啟動配置檔案,spring bean 容器載入我們的介面實現類,服務提供者封裝serverSocket , 並通過 zookeeper客戶端在 zookeeper中建立節點, 節點中記錄服務端的ip, 埠,暴露的介面等資訊。
2) 客戶端通過zookeeper客戶端拿到服務端ip , 埠等資訊。
3)構建header 和 body 的位元組陣列(byte) , 通過socket 拿到輸出流,write() 方法輸出位元組陣列。
4)服務端拿到輸入流之後,會得到介面資訊,呼叫的方法名稱等,可以通過spring的bean 工廠類,拿到 注入到bean 中的 該介面子類物件,然後去掉用具體的方法,拿到返回值後可以進行序列化,在用 socket 和 serverSocket 建立的管道 輸出流 輸出 子類方法返回的值,此時 http 請求,響應完畢。
說明:http 通訊位元組流,轉為字元流,反序列化可以用阿里的
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
模擬消費端原始碼 :
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>tony.test</groupId>
<artifactId>tony-dubbo-client</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
</project>
byte[]原資料封裝類 :
package com.agui.test.consumer;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import com.alibaba.fastjson.JSON;
/**
* 發起dubbo呼叫請求
*/
public class AguiDubboRpcRequest {
String host; // 服務提供者IP
int port; // 服務提供者埠
// 協議中header段
ByteBuffer header = ByteBuffer.allocate(16);
// 協議中body段
StringBuffer rpcBody = new StringBuffer();
public TonyDubboRpcRequest(String host, int port) {
this.host = host;
this.port = port;
// 魔數 da bb
header.put((byte) 0xda);
header.put((byte) 0xbb);
// 標識 固定為 C6
header.put((byte) 0xC6);
// 響應狀態 ,這裡沒有
header.put((byte) 0x00);
// 訊息ID 編號傳進來
// 資料長度 這個要在最後計算
}
/** 需要是完整的類名 */
public TonyDubboRpcRequest dubboVersion(String dubboVersion) {
rpcBody.append(JSON.toJSONString(dubboVersion));
rpcBody.append("\r\n");
return this;
}
/** 需要是完整的類名 */
public TonyDubboRpcRequest path(String path) {
rpcBody.append(JSON.toJSONString(path));
rpcBody.append("\r\n");
return this;
}
/** 服務版本號 */
public TonyDubboRpcRequest serviceVersion(String serviceVersion) {
rpcBody.append(JSON.toJSONString(serviceVersion));
rpcBody.append("\r\n");
return this;
}
/** 方法名 */
public TonyDubboRpcRequest method(String method) {
rpcBody.append(JSON.toJSONString(method));
rpcBody.append("\r\n");
return this;
}
/** 引數型別 */
public TonyDubboRpcRequest desc(String desc) {
rpcBody.append(JSON.toJSONString(desc));
rpcBody.append("\r\n");
return this;
}
/** 引數值 */
public TonyDubboRpcRequest values(Object... values) {
for (Object value : values) {
rpcBody.append(JSON.toJSONString(value));
rpcBody.append("\r\n");
}
return this;
}
/** 隱式傳參 */
public TonyDubboRpcRequest attachments(Object attachment) {
rpcBody.append(JSON.toJSONString(attachment));
rpcBody.append("\r\n");
return this;
}
/** 補齊header */
public TonyDubboRpcRequest build(long msgId) {
// 訊息ID
byte[] msgIdBytes = new byte[8];
long2bytes(msgId, msgIdBytes, 0);
this.header.put(msgIdBytes);
// 計算body長度
int length = this.rpcBody.toString().getBytes().length;
byte[] lenBytes = new byte[4];
int2bytes(length, lenBytes, 0);
this.header.put(lenBytes);
return this;
}
/** int轉4位元組陣列 */
private void int2bytes(int v, byte[] b, int off) {
b[off + 3] = (byte) v;
b[off + 2] = (byte) (v >>> 8);
b[off + 1] = (byte) (v >>> 16);
b[off + 0] = (byte) (v >>> 24);
}
/** long轉8位元組陣列 */
private void long2bytes(long v, byte[] b, int off) {
b[off + 7] = (byte) v;
b[off + 6] = (byte) (v >>> 8);
b[off + 5] = (byte) (v >>> 16);
b[off + 4] = (byte) (v >>> 24);
b[off + 3] = (byte) (v >>> 32);
b[off + 2] = (byte) (v >>> 40);
b[off + 1] = (byte) (v >>> 48);
b[off + 0] = (byte) (v >>> 56);
}
public String run() throws Exception {
System.out.println("###########輸出rcpBody內容:##########");
System.out.println(this.rpcBody.toString());
System.out.println("###########結束輸出###########");
Socket socket = new Socket(this.host, this.port);
try {
OutputStream rpcOPS = socket.getOutputStream();
// 發起呼叫請求
rpcOPS.write(header.array());
rpcOPS.write(this.rpcBody.toString().getBytes());
// 輸出RPC呼叫結果
InputStream rpcRsp = socket.getInputStream();
byte[] header = new byte[16];
rpcRsp.read(header);
byte[] resp = new byte[1024];
rpcRsp.read(resp);
// 忽略header,取body
return new String(resp);
} catch (Exception e) {
e.printStackTrace();
} finally {
socket.close();
}
return null;
}
}
程式入口 :
package com.agui.test.consumer;
import com.alibaba.fastjson.JSONObject;
public class AguiConsumerMain {
// 手寫dubbo客戶端
public static void main(String[] args) throws Exception {
// TODO 配置註冊中心
// TODO 獲取服務資訊
// 本例項重點在於分析dubbo rpc協議內容,故此消費者假定已經知曉服務端的資訊。
JSONObject user = JSONObject.parseObject("{\"name\":\"tony\"}");
TonyDubboRpcRequest tonyDubboRpcRequest = new TonyDubboRpcRequest("127.0.0.1", 20880);
tonyDubboRpcRequest
.dubboVersion("2.5.3")
.path("com.tony.test.dubbo.DemoService")
.serviceVersion("0.0.0.0")
.method("sayHello")
.desc("Lcom/tony/test/dubbo/User;Ljava/lang/String;")
.values(user, "測試tony_1")
.attachments(JSONObject.parseObject("{\"path\":\"com.tony.test.dubbo.DemoService\",\"interface\":\"com.tony.test.dubbo.DemoService\",\"version\":\"0.0.0\"}"))
.build(3);
String result = tonyDubboRpcRequest.run();
System.out.println("RPC呼叫結果:");
System.out.println(result);
}
}
執行程式的時候需要啟動上面的:服務提供者,zookeeper 。