利用Thrift和zk簡單實現服務治理框架中的訂閱釋出機制
本文簡單介紹下利用Thrift和zk簡單實現服務治理框架服務的訂閱釋出機制,類似於Dubbo的服務治理。這個只是簡單版本,只供學習和理解用。
全部程式碼下載:Github連結:github連結,點選驚喜;寫文章不易,歡迎大家採我的文章,以及給出有用的評論,當然大家也可以關注一下我的github;多謝;
1.什麼是服務治理:
1.1微服務簡單介紹:
微服務已經成為當下最熱門的話題之一。它是一種新的架構風格,涉及組織架構、設計、交付、運維等方面的變革,核心目標是為了解決系統的交付週期,並降低維護成本和研發成本。相比傳統的SOA架構或者單塊架構,微服務有很多的優勢,比如技術的多樣性、模組化、獨立部署等,但也帶來了相應的成本,比如運維成本、服務管理成本等。
1.2服務治理的出現
在微服務盛行下,利用RMI或Hessian等工具,簡單的暴露和引用遠端服務,通過配置服務的URL地址進行呼叫已經變得越來越不能滿足需求。
1.服務越來越多時,服務URL配置管理變得非常困難。
2.服務間依賴關係變得錯蹤複雜
3.服務的呼叫量越來越大,服務的容量問題就暴露出來,這個服務需要多少機器支撐?什麼時候該加機器?
4…….等等
為了滿足服務線下管控、保障線上高效執行,需要有一個統一的服務治理框架對服務進行統一、有效管控,保障服務的高效、健康執行。服務治理是分散式服務框架的一個可選特性,儘管從服務開發和執行角度看它不是必須的,但是如果沒有服務治理功能,分散式服務框架的服務SLA很難得到保障,服務化也很難真正實施成功。
基於以上原因,需要對各個服務做治理,這也是就為什麼有了dubbo這類服務治理框架,它與其他RPC框架相比(例如thrift,avro),不僅僅提供了透明的服務呼叫,而且還提供了服務治理,比如上述的呼叫統計管理、負載均衡,這樣每個業務模組只需專注於自己的內部業務邏輯即可。
1.3服務治理的幾個要素:
服務管理元件:這個元件是“服務治理”的核心元件,您的服務治理框架有多強大,主要取決於您的服務管理元件功能有多強大。它至少具有的功能包括:服務註冊管理、訪問路由;另外,它還可以具有:服務版本管理、服務優先順序管理、訪問許可權管理、請求數量限制、連通性管理、註冊服務叢集、節點容錯、事件訂閱-釋出、狀態監控,等等功能。
服務提供者(服務生產者):即服務的具體實現,然後按照服務治理框架特定的規範釋出到服務管理元件中。這意味著什麼呢?這意味著,服務提供者不一定按照RPC呼叫的方式釋出服務,而是按照整個服務治理框架所規定的方式進行釋出(如果服務治理框架要求服務提供者以RPC呼叫的形式進行釋出,那麼服務提供者就必須以RPC呼叫的形式進行釋出;如果服務治理框架要求服務提供者以Http介面的形式進行釋出,那麼服務提供者就必須以Http介面的形式進行釋出,但後者這種情況一般不會出現)。
服務使用者(服務消費者):即呼叫這個服務的使用者,呼叫者首先到服務管理元件中查詢具體的服務所在的位置;服務管理元件收到查詢請求後,將向它返回具體的服務所在位置(視服務管理元件功能的不同,還有可能進行這些計算:判斷服務呼叫者是否有許可權進行呼叫、是否需要生成認證標記、是否需要重新檢查服務提供者的狀態、讓呼叫者使用哪一個服務版本等等)。服務呼叫者在收到具體的服務位置後,向服務提供者發起正式請求,並且返回相應的結果。第二次呼叫時,服務請求者就可以像服務提供者直接發起呼叫請求了(當然,您可以有一個服務提供期限的設定,使用租約協議就可以很好的實現)。
參考於: http://blog.csdn .net/yinwenjie/article/details/49869535
簡單畫了如下圖:
1.4服務的訂閱釋出機制
它的核心理念是實現服務消費者和服務提供者的解耦,讓服務消費者能夠像使用本地介面一樣消費遠端的服務提供者,而不需要關心服務提供者的位置資訊,實現透明化呼叫。常用的服務註冊中心有Zookeeper、ETCD,以及基於資料庫的配置中心。
2.設計一個服務治理框架中的訂閱釋出機制
2.1使用的技術:
2.2設計思路
1.利用Zookeeper建立/Service根目錄,在該目錄下建立相應的服務介面子目錄存放該介面的IP地址和埠號—註冊服務
2.利用Thrift建立服務和啟動服務
3.利用Zookeeper去對應目錄/Service訂閱相應服務獲得介面的IP地址和埠號,並註冊監聽事件,當目錄改變時更新介面的IP地址和埠號—訂閱服務
3.實現訂閱釋出機制
3.1實現步驟:
1.編寫Thrift的IDL並編譯出相應的介面類。
2.實現相應的介面。
3.編寫服務啟動和註冊服務類。
4.編寫相應的客戶端訂閱服務。
3.2程式碼實現
工程為maven工程,假如不建立maven工程,請下載對應的lib包。
具體實現原理,見註釋:
1.IDL檔案和編譯:
//名稱空間定義:java包
namespace java cn.wpeace.thrift
//結構體定義:轉化java中的實體類
struct Request{
1:required string userName;
2:required string password;
}
//定義返回型別
struct Student{
1:required string naem;
2:required i32 age;
}
struct People{
1:required string naem;
2:required i32 age;
3:required string sex;
}
//異常描述定義
exception HelloException{
1:required string msg;
}
//服務定義,生成介面用
service StudentService{
list<Student> getAllStudent(1:Request request)throws (1:HelloException e);
}
//服務定義,生成介面用
service PeopleService{
list<People> getAllPeople(1:Request request)throws (1:HelloException e);
}
//thrift -gen java ./zk.thrift
2.實現相應介面:
public class StudentServiceImpl implements Iface {// 實現的是StudentService類下面的介面
@Override
public List<Student> getAllStudent(Request request) throws HelloException, TException {
System.out.println("呼叫studentService");
System.out.println(request.getUserName());
System.out.println(request.getPassword());
List<Student> students = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Student student = new Student();
student.setNaem("peace" + i);
student.setAge(22 + i);
students.add(student);
}
return students;
}
}
public class PeopleServiceImpl implements Iface{
@Override
public List<People> getAllPeople(Request request) throws HelloException, TException {
System.out.println("呼叫PeopleService");
System.out.println(request.getUserName());
System.out.println(request.getPassword());
List<People>peoples=new ArrayList<>();
for(int i=0;i<5;i++)
{
People people=new People("wpeace", 22+i, "男");
peoples.add(people);
}
return peoples;
}
}
3.實現服務啟動和註冊類
package cn.wpeace.thriftService;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import cn.wpeace.thrift.PeopleService;
import cn.wpeace.thrift.StudentService;
import net.sf.json.JSONObject;
public class ServiceSatrt implements Watcher{
//初始化log4j
static{
BasicConfigurator.configure();
}
private static final Log LOGGER=LogFactory.getLog(ServiceSatrt.class);
private static final Integer[] PORTS={8081,8082};
public static final String serviceNames[]={"studentService","peopleService"};
private static final String SERVICE_IP="192.168.1.118";
private CountDownLatch connectedSignal=new CountDownLatch(1);//用於建立連線
private ZooKeeper zk ;
/**
* thrift服務啟動標記
*/
private Integer isThriftStart=0;
/**
* 啟動所有服務
*/
private void startServer(){
ServiceSatrt.LOGGER.info("啟動Thrift執行緒");
// 建立啟動執行緒:
StartServerThread studenThread = new StartServerThread(PORTS[0],
new StudentService.Processor<StudentService.Iface>(new StudentServiceImpl()));
StartServerThread peopleThread = new StartServerThread(PORTS[1],
new PeopleService.Processor<PeopleService.Iface>(new PeopleServiceImpl()));
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.submit(studenThread);
pool.submit(peopleThread);
//關閉執行緒池:執行緒仍然在執行
pool.shutdown();
}
private class StartServerThread implements Runnable{
private Integer port;
private TProcessor processor;
public StartServerThread(Integer port,TProcessor processor) {
this.port=port;
this.processor=processor;
}
@Override
public void run() {
ServiceSatrt.LOGGER.info("thrift服務正在準備啟動");
try {
// 非阻塞式
TNonblockingServerSocket serverSocket=new TNonblockingServerSocket(port);
// 為伺服器設定對應的IO網路模型
TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket);
// 設定控制器
tArgs.processor(processor);
// 設定訊息封裝格式
tArgs.protocolFactory(new TBinaryProtocol.Factory());//Thrift特有的一種二進位制描述格式
// 啟動Thrift服務
TNonblockingServer server = new TNonblockingServer(tArgs);
server.setServerEventHandler(new StartServerEventHander());
server.serve();//啟動後,程式就停在這裡了。
} catch (TTransportException e) {
e.printStackTrace();
}
}
}
private class StartServerEventHander implements TServerEventHandler{
@Override
public void preServe() {
synchronized (isThriftStart) {
isThriftStart++;//當全部服務啟動成功才連線zk
if(isThriftStart==2){
synchronized (ServiceSatrt.this) {
ServiceSatrt.LOGGER.info("thrift服務啟動完成");
ServiceSatrt.this.notify();
}
}
}
}
@Override
public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
return null;
}
@Override
public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
}
@Override
public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
}
}
private void connectZk() throws KeeperException, InterruptedException, IOException{
// 連線到zk伺服器叢集,新增預設的watcher監聽
zk= new ZooKeeper("192.168.1.127:2181", 120000, this);
connectedSignal.await();
// 建立一個父級節點Service
Stat pathStat = null;
try {
pathStat = zk.exists("/Service", false);
// 如果條件成立,說明節點不存在(只需要判斷一個節點的存在性即可)
// 建立的這個節點是一個“永久狀態”的節點
if (pathStat == null) {
zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
System.exit(-1);
}
// 開始新增子級節點,每一個子級節點都表示一個這個服務提供者提供的業務服務
for (int i = 0; i < 2; i++) {
JSONObject nodeData = new JSONObject();
nodeData.put("ip", SERVICE_IP);
nodeData.put("port", PORTS[i]);
zk.create("/Service/" + serviceNames[i], nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
}
// 執行到這裡,說明所有的service都啟動完成了
ServiceSatrt.LOGGER.info("===================所有service都啟動完成了,主執行緒開始啟動===================");
}
@Override
public void process(WatchedEvent event) {
//建立連線用
if(event.getState()==KeeperState.SyncConnected){
connectedSignal.countDown();
return;
}
//暫在這裡不做處理,正常情況下需要處理。
}
public static void main(String[] args) {
//啟動服務
ServiceSatrt serviceSatrt=new ServiceSatrt();
serviceSatrt.startServer();
//等待服務啟動完成
synchronized (serviceSatrt) {
try {
while (serviceSatrt.isThriftStart<2) {
serviceSatrt.wait();
}
} catch (Exception e) {
ServiceSatrt.LOGGER.error(e);
System.out.println(-1);
}
}
//啟動連線
try {
serviceSatrt.connectZk();
} catch (Exception e) {
ServiceSatrt.LOGGER.error(e);
System.out.println(-1);
}
}
}
4.編寫客戶端類:
package cn.wpeace.thriftClinet;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.sound.midi.VoiceStatus;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import cn.wpeace.thrift.People;
import cn.wpeace.thrift.PeopleService;
import cn.wpeace.thrift.Request;
import cn.wpeace.thrift.Student;
import cn.wpeace.thrift.StudentService;
import cn.wpeace.thriftService.ServiceSatrt;
import net.sf.json.JSONObject;
public class ThriftClinet implements Watcher{
static{
BasicConfigurator.configure();
}
private static final Log LOGGER=LogFactory.getLog(ThriftClinet.class);
private String serverIp;
private String serverPort;
private String servername;
private CountDownLatch connectedSignal=new CountDownLatch(1);//用於建立連線
private ZooKeeper zk;
private void init(String servername) throws IOException, KeeperException, InterruptedException{
// 連線到zk伺服器叢集,新增預設的watcher監聽
this.zk = new ZooKeeper("192.168.1.127:2181", 120000, this);
connectedSignal.await();
this.servername=servername;
updateServer();
ThriftClinet.LOGGER.info("初始化完成");
}
/**
* 從zk上獲取Service中的節點資料:包括IP和埠
* @throws KeeperException
* @throws InterruptedException
*/
private void updateServer() throws KeeperException, InterruptedException {
this.serverIp=null;
this.serverPort=null;
/*
*
* 判斷服務根節點是否存在
*/
Stat pathStat = null;
try {
pathStat = this.zk.exists("/Service", false);
// 如果條件成立,說明節點不存在
// 建立的這個節點是一個“永久狀態”的節點
if (pathStat == null) {
ThriftClinet.LOGGER.info("客戶端創立Service");
this.zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
return;
}
} catch (Exception e) {
ThriftClinet.LOGGER.error(e);
System.exit(-1);
}
// 獲取服務列表
List<String> serviceList = this.zk.getChildren("/Service", false);
if (serviceList == null || serviceList.isEmpty()) {
ThriftClinet.LOGGER.info("未發現相關服務,客戶端退出");
return;
}
// 查詢所需的服務是否存在
boolean isFound = false;
byte[] data;// 獲取節點資料
for (String name : serviceList) {
if (StringUtils.equals(name, this.servername)) {
isFound = true;
break;// 找到一個就退出
}
}
// 獲得資料
if (isFound) {
data = this.zk.getData("/Service/" + this.servername, false, null);
} else {
ThriftClinet.LOGGER.info("未發現相關服務,客戶端退出");
return;
}
if (data == null || data.length == 0) {
ThriftClinet.LOGGER.info("沒有發現有效資料,客戶端退出");
return;
}
JSONObject fromObject = JSONObject.fromObject(new String(data));
this.serverIp = fromObject.getString("ip");
this.serverPort = fromObject.getString("port");
}
@Override
public void process(WatchedEvent event) {
//建立連線用
if(event.getState()==KeeperState.SyncConnected){
connectedSignal.countDown();
return;
}
//如果發生 Service下的節點變換,就更新ip和埠
if (event.getType() == EventType.NodeChildrenChanged
&& "/Service".equals(event.getPath())) {
try {
updateServer();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ThriftClinet studentClinet=new ThriftClinet();
ThriftClinet peopleClinet=new ThriftClinet();
/**
* studnetService 測試
*/
try {
studentClinet.init(ServiceSatrt.serviceNames[0]);
if(studentClinet.serverIp==null||studentClinet.serverPort==null){
ThriftClinet.LOGGER.info("沒有發現有效資料,客戶端退出");
}
//如果是非阻塞型 需要使用
TTransport tSocket = new TFramedTransport(new TSocket(studentClinet.serverIp,
Integer.parseInt(studentClinet.serverPort), 30000));
//設定封裝協議
TBinaryProtocol protocol = new TBinaryProtocol(tSocket);
//建立呼叫client
StudentService.Client client=new StudentService.Client(protocol);
//設定呼叫引數:
Request request=new Request().setUserName("peace").setPassword("123456");
//準備傳輸
tSocket.open();
//正式呼叫介面
List<Student> allStudent = client.getAllStudent(request);
//請求結束,斷開連線
tSocket.close();
for(Student student:allStudent)
{
System.out.println(student.getNaem()+":"+student.getAge());
}
} catch (Exception e) {
ThriftClinet.LOGGER.info("出現異常,客戶端退出");
}
/**
* PeopleService測試
*/
try {
peopleClinet.init(ServiceSatrt.serviceNames[1]);
if(peopleClinet.serverIp==null||peopleClinet.serverPort==null){
ThriftClinet.LOGGER.info("沒有發現有效資料,客戶端退出");
}
//如果是非阻塞型 需要使用
TTransport tSocket = new TFramedTransport(new TSocket(peopleClinet.serverIp,
Integer.parseInt(peopleClinet.serverPort), 30000));
//設定封裝協議
TBinaryProtocol protocol = new TBinaryProtocol(tSocket);
//建立呼叫client
PeopleService.Client client=new PeopleService.Client(protocol);
//設定呼叫引數:
Request request=new Request().setUserName("peace").setPassword("123456");
//準備傳輸
tSocket.open();
//正式呼叫介面
List<People> allPeople = client.getAllPeople(request);
//請求結束,斷開連線
tSocket.close();
for(People people:allPeople)
{
System.out.println(people.getNaem()+":"+people.getAge()+"性別"+people.getSex());
}
} catch (Exception e) {
ThriftClinet.LOGGER.info("出現異常,客戶端退出");
}
}
}
所有程式碼下載請見github,上面的連結。
3.3測試步驟:
1.啟動ServiceSatrt類
2.啟動ThriftClinet類
3.測試結果:
服務端:
客戶端:
本文來自伊豚(blog.wpeace.cn)