高可用的池化 Thrift Client 實現(原始碼分享)
本文將分享一個高可用的池化 Thrift Client 及其原始碼實現,歡迎閱讀原始碼(Github)並使用,同時歡迎提出寶貴的意見和建議,本人將持續完善。
本文的主要目標讀者是對 Thrift 有一定了解並使用的童鞋,如對 Thrift 的基礎知識瞭解不多或者想重溫一下基礎知識,推薦先閱讀本站文章《和 Thrift 的一場美麗邂逅》。
下面進入正題。
為什麼我們需要這麼一個元件?
我們知道,Thrift 是一個 RPC 框架體系,可以非常方便的進行跨語言 RPC 服務的開發和呼叫。然而,它並沒有提供針對多個 Server 的 Smart Client【1】。比如,你有一個服務 service,分別部署在 116.31.1.1 和 116.31.1.2 兩臺伺服器上,當你需要從 Client 端呼叫該 service 的某個遠端方法的時候,你只能在程式碼中顯式指定使用 116.31.1.1 或者 116.31.1.2 其中的一個。這種情況下,你呼叫的時候無法預知所指定 IP 對應的服務是否可用,並且當該服務不可用時,無法隱式自動切換到呼叫另外一個 IP 對應的服務。也就是說,服務的狀態對你並不是透明的,並且無法做到服務的負載均衡和高可用。
此外,當你呼叫遠端方法時,每次你都得新建一個連線,當請求量很大時,不斷的建立、刪除連線所耗費的服務資源是巨大的。
因此,我們需要這麼一個元件,使服務狀態透明化並底層實現負載均衡和高可用,讓你可以專注於業務邏輯的實現,提升工作效率和服務的質量。下面我們就對該元件(ThrifJ)進行詳細的剖析。
它到底能做些什麼?
特性
- 鏈式呼叫API,簡潔直觀
- 完善的預設配置,無需擔心呼叫時配置不全導致拋錯
- 池化連線物件,高效管理連線的生命週期
- 異常服務自動隔離與恢復
- 多種可配置的負載均衡策略,支援隨機、輪詢、權重和雜湊
- 多種可配置的服務級別,並自動根據服務級別進行服務降級
該如何使用它?
目前最新版本為1.0.1(點此關注最新版本的更新),首先在專案中引入 thriftj-1.0.1.jar,或在 Maven 依賴中加入:
<dependency> <groupId>com.github.cyfonly</groupId> <artifactId>thriftj</artifactId> <version>1.0.1</version> </dependency>
需要注意的是,ThriftJ 基於 slf4j 構建,因此你需要在專案中增加具體日誌實現的依賴,比如 log4j 或 logback。
然後在專案中,參照以下這段程式碼進行呼叫:
//Thrift server 列表 private static final String servers = "127.0.0.1:10001,127.0.0.1:10002"; //TTransport 驗證器 ConnectionValidator validator = new ConnectionValidator() { @Override public boolean isValid(TTransport object) { return object.isOpen(); } }; //連線物件池配置 GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig(); //failover 策略 FailoverStrategy failoverStrategy = new FailoverStrategy(); //構造 ThriftClient 物件並配置 final ThriftClient thriftClient = new ThriftClient(); thriftClient.servers(servers) .loadBalance(Constant.LoadBalance.RANDOM) .connectionValidator(validator) .poolConfig(poolConfig) .failoverStrategy(failoverStrategy) .connTimeout(5) .backupServers("") .serviceLevel(Constant.ServiceLevel.NOT_EMPTY) .start(); //列印從 ThriftClient 獲取到的可用服務列表 List<ThriftServer> servers = thriftClient.getAvailableServers(); for(ThriftServer server : servers){ System.out.println(server.getHost() + ":" + server.getPort()); } //服務呼叫 if(servers.size()>0){ try{ TestThriftJ.Client client = thriftClient.iface(TestThriftJ.Client.class); QryResult result = client.qryTest(1); System.out.println("result[code=" + result.code + " msg=" + result.msg + "]"); }catch(Throwable t){ logger.error("-------------exception happen", t); } }
友情提示:除 servers 必須配置外,其他配置均為可選(使用預設配置)
它是如何設計並實現的呢?
整體設計
連線池物件工廠及連線物件的管理
基於 commons-pool2 中的 KeyedPooledObjectFactory,以 ThriftServer 為 key,TTransport 為 value 進行實現。關鍵程式碼如下:
@Override public PooledObject<TTransport> makeObject(ThriftServer thriftServer) throws Exception { TSocket tsocket = new TSocket(thriftServer.getHost(), thriftServer.getPort()); tsocket.setTimeout(timeout); TFramedTransport transport = new TFramedTransport(tsocket); transport.open(); DefaultPooledObject<TTransport> result = new DefaultPooledObject<TTransport>(transport); logger.trace("Make new thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort()); return result; } @Override public boolean validateObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) { boolean isValidate; try { if (failoverChecker == null) { isValidate = pooledObject.getObject().isOpen(); } else { ConnectionValidator validator = failoverChecker.getConnectionValidator(); isValidate = pooledObject.getObject().isOpen() && (validator == null || validator.isValid(pooledObject.getObject())); } } catch (Throwable e) { logger.warn("Fail to validate tsocket: {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e}); isValidate = false; } if (failoverChecker != null && !isValidate) { failoverChecker.getFailoverStrategy().fail(thriftServer); } logger.info("ValidateObject isValidate:{}", isValidate); return isValidate; } @Override public void destroyObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) throws Exception { TTransport transport = pooledObject.getObject(); if (transport != null) { transport.close(); logger.trace("Close thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort()); } }
在使用連線物件時,根據使用者的自定義連線池配置建立連線池,並實現連線物件的獲取、回池、清除以及連線池的關閉操作。關鍵程式碼如下:
public DefaultThriftConnectionPool(KeyedPooledObjectFactory<ThriftServer, TTransport> factory, GenericKeyedObjectPoolConfig config) { connections = new GenericKeyedObjectPool<>(factory, config); } @Override public TTransport getConnection(ThriftServer thriftServer) { try { return connections.borrowObject(thriftServer); } catch (Exception e) { logger.warn("Fail to get connection for {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e}); throw new RuntimeException(e); } } @Override public void returnConnection(ThriftServer thriftServer, TTransport transport) { connections.returnObject(thriftServer, transport); } @Override public void returnBrokenConnection(ThriftServer thriftServer, TTransport transport) { try { connections.invalidateObject(thriftServer, transport); } catch (Exception e) { logger.warn("Fail to invalid object:{},{}", new Object[] { thriftServer, transport, e }); } } @Override public void close() { connections.close(); } @Override public void clear(ThriftServer thriftServer) { connections.clear(thriftServer); }
異常服務自動隔離與恢復
需要實現服務狀態的透明化,就必須在底層實現服務的監測、隔離和恢復。在 ThriftJ 中,呼叫 ThriftClient 時會啟動一個執行緒對服務進行非同步監測,使用者可以指定檢驗規則(對應配置為 ConnectionValidator)以及 failover 策略(對應配置為 FailoverStrategy,可以指定失敗的次數、失效持續時間和恢復持續時間)。預設情況下,服務驗證規則為判斷 TTransport 是否處於開啟狀態,即:
if (this.validator == null) { this.validator = new ConnectionValidator() { @Override public boolean isValid(TTransport object) { return object.isOpen(); } }; }
而預設的 failover 策略為
- 失敗次數:10(次),表示通過 ConnectionValidator 檢驗失敗 10 次後才考慮將該服務失效,需要配合失效持續時間一起使用
- 時效持續時間:1(分鐘),表示在一個檢驗週期內,首次檢驗失敗的時間持續達到該值後才考慮將該服務失效,配合失敗次數一起使用
- 恢復持續時間:1(分鐘),表示在判定某服務失效並隔離後,經過該值後將服務重新恢復
以上功能基於 Guava cache 實現,關鍵程式碼如下:
/** * 使用預設 failover 策略 */ public FailoverStrategy() { this(DEFAULT_FAIL_COUNT, DEFAULT_FAIL_DURATION, DEFAULT_RECOVER_DURATION); } /** * 自定義 failover 策略 * @param failCount 失敗次數 * @param failDuration 失效持續時間 * @param recoverDuration 恢復持續時間 */ public FailoverStrategy(final int failCount, long failDuration, long recoverDuration) { this.failDuration = failDuration; this.failedList = CacheBuilder.newBuilder().weakKeys().expireAfterWrite(recoverDuration, TimeUnit.MILLISECONDS).build(); this.failCountMap = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<T, EvictingQueue<Long>>() { @Override public EvictingQueue<Long> load(T key) throws Exception { return EvictingQueue.create(failCount); } }); } public void fail(T object) { logger.info("Server {}:{} failed.", ((ThriftServer)object).getHost(),((ThriftServer)object).getPort()); boolean addToFail = false; try { EvictingQueue<Long> evictingQueue = failCountMap.get(object); synchronized (evictingQueue) { evictingQueue.add(System.currentTimeMillis()); if (evictingQueue.remainingCapacity() == 0 && evictingQueue.element() >= (System.currentTimeMillis() - failDuration)) { addToFail = true; } } } catch (ExecutionException e) { logger.error("Ops.", e); } if (addToFail) { failedList.put(object, Boolean.TRUE); logger.info("Server {}:{} failed. Add to fail list.", ((ThriftServer)object).getHost(), ((ThriftServer)object).getPort()); } } public Set<T> getFailed() { return failedList.asMap().keySet(); }
負載均衡
ThriftJ 提供了四種可選的負載均衡策略:
- 隨機
- 輪詢
- 權重
- 雜湊
在使用者不顯式指定的情況下,預設採用隨機演算法。具體演算法的實現在此就不再進行過多的描述了。
需要注意的是,ThriftJ 嚴格規範了呼叫的語義,比如使用雜湊策略時,必須要指定 hash key;當使用非雜湊的其他策略時,一定不能指定 key,避免造成理解的二義性。
服務級別與服務降級
ThriftJ 提供了多種可配置的服務級別,並根據服務級別進行服務降級處理,其對應關係如下:
- SERVERS_ONLY:最高級別,僅返回配置的 servers 列表中可用的服務
- ALL_SERVERS:中等級別,當 servers 列表中的服務全部不可用時,返回 backupServers 列表中的可用服務
- NOT_EMPTY:最低級別,當 servers 和 backupServers 列表中的服務全部不可用時,返回 servers 列表中的所有服務
其中 ThriftJ 預設使用的服務級別是 NOT_EMPTY。服務降級處理的關鍵程式碼如下:
private List<ThriftServer> getAvailableServers(boolean all) { List<ThriftServer> returnList = new ArrayList<>(); Set<ThriftServer> failedServers = failoverStrategy.getFailed(); for (ThriftServer thriftServer : serverList) { if (!failedServers.contains(thriftServer)) returnList.add(thriftServer); } if (this.serviceLevel == Constant.ServiceLevel.SERVERS_ONLY) { return returnList; } if ((all || returnList.isEmpty()) && !backupServerList.isEmpty()) { for (ThriftServer thriftServer : backupServerList) { if (!failedServers.contains(thriftServer)) returnList.add(thriftServer); } } if (this.serviceLevel == Constant.ServiceLevel.ALL_SERVERS) { return returnList; } if(returnList.isEmpty()){ returnList.addAll(serverList); } return returnList; }
我還有話要說
技術的提升源自無私的分享,好的技術或工具分享出來,並不會讓自己失去什麼,反而可以在大家共同研究和溝通後使之獲得更好的完善。不要擔心自己寫的工具不夠好,不要害怕自己的技術不夠牛,誰能一步就登天呢?
請熱愛你的熱愛!
【1】Smart Client:比如 MongoClient,可自動發現叢集服務節點、自動故障轉移和負載均衡。
相關推薦
高可用的池化 Thrift Client 實現(原始碼分享)
本文將分享一個高可用的池化 Thrift Client 及其原始碼實現,歡迎閱讀原始碼(Github)並使用,同時歡迎提出寶貴的意見和建議,本人將持續完善。 本文的主要目標讀者是對 Thrift 有一定了解並使用的童鞋,如對 Thrift 的基礎知識瞭解不多或者想重溫一下基礎知識,推薦先閱讀本站文章《和
ActiveMQ 高可用集群安裝、配置(ZooKeeper + LevelDB)
訪問 wrap 創建 管理 apache link over love 其他 ActiveMQ 高可用集群安裝、配置(ZooKeeper + LevelDB) 1、ActiveMQ 集群部署規劃: 環境: JDK7 版本:ActiveMQ 5.11.1 ZooKeep
Android自定義控制元件:Android L控制元件點選水波紋的實現(原始碼 + Demo)
Demo: 一、控制元件的流程: 大致上如下,實際是有些偏差的大家可以自己畫畫 RevealLayout()--->init()--->onMeasure()--->onLayout()--->onDraw()--->dispat
CocosCreator之KUOKUO帶你做瘋狂木板-過橋(2)(原始碼分享)
本次引擎2.0.5 編輯工具VSCode 目標: 第二部分,過橋與得分。 接著上一節: 我們實現了木板的變長與下落。 現在我們實現一個牛逼的平臺,怎麼說它牛逼呢?我準備全程就用它一個來完成所有功能。 看KUOKUO怎麼實現。console.log(滑稽) 首先,我們複
3D語音天氣球(原始碼分享)——建立可旋轉的3D球
開篇廢話: 在9月份時參加了一個網站的比賽,比賽的題目是需要使用第三方平臺提供的服務做出創意的作品。 於是我選擇使用語音服務,天氣服務,Unity3D,Android來製作一個3D語音天氣預報,我給它起名叫做3D語音天氣球(好土。。。) 雖然沒獲獎但我覺得這個專案中還是有
RHCS實現高可用中的共享儲存iscisi(mysql為例)
1、實驗環境 server2 172.25.66.2(配置Nginx、ricci、luci) server3 172.25.66.3(Apache) server4 172.25.66.4 (Apache) server5 172.25.66.5(配置Nginx
haproxy高可用之非搶占keepalived(一)
haproxy keepalivedHAProxy簡介HAProxy是免費、極速且可靠的用於為TCP和基於HTTP應用程序提供高可用、負載均衡和代理服務的解決方案,尤其適用於高負載且需要持久連接或7層處理機制的web站點。HAProxy還可以將後端的服務器與網絡隔離,起到保護後端服務器的作用。HAProxy的
haproxy高可用之非搶占keepalived(二)
keepalived haproxy高可用 非搶占1、安裝keepalivedyum install keepalived2、編輯配置文件 /etc/keepalived/keepalived.conf主:global_defs { notification_email { [emai
corosync+pacemaker+crmsh的高可用web集群的實現
corosync pacemaker crmsh httpd網絡規劃:node1:eth0:172.16.31.10/16node2: eth0: 172.16.31.11/16nfs: eth0: 172.16.31.12/15註:nfs在提供NFS服務的同時是一臺NTP服務器,可以讓node1和nod
heartbeat v2版CRM的高可用web集群的實現
heartbeat heartbeat v2 crm on上篇文章:heartbeat v1版CRM的高可用集群的實現集群架構圖 : 主節點(172.16.31.10)客戶端(172.16.31.12) Vitual IP(172.16.3
高可用分布式存儲(Corosync+Pacemaker+DRBD+MooseFS)
高可用分布式存儲(corosync+pacemaker+drbd+moosefs)高可用分布式存儲(Corosync+Pacemaker+DRBD+MooseFS)配置步驟:(1) 安裝與配置DRBD編譯安裝Master-server(2)安裝配置使用pcs安裝corosync+pacemaker(3
LVS+keepalived高可用負載均衡集群部署(二)---LAMP網站服務器與LVS服務器
虛擬機l 系統環境: RHEL7 l 硬件環境:虛擬機l 項目描述:為解決網站訪問壓力大的問題,需要搭建高可用、負載均衡的 web集群。 l 架構說明:整個服務架構采用功能分離的方式部署。後端采用2臺mysql 數據庫,實現主從結構以及讀寫分離。中間LAMP網站服務器共有2臺, 前端 2臺LVS服務
Redis高可用復制集群實現
priority emc 第一條 top 主服務器 filename byte 並行復制 加載 redis簡單介紹 Redis 是完全開源免費的,遵守BSD協議,是一個高性能的key-value數據庫。Redis 與其他 key - value 緩存產品有以下三個特點:
MySQL高可用方案之DRBD+MySQL+RHCS(下)
續:MySQL高可用方案之DRBD+MySQL+RHCS(上) 五、MySQL5.6.42安裝 安裝步驟(兩臺機器都要安裝) [[email protected] ~]# cd /opt/ [[email protected] opt]# ls mysql-5.6.42-linux
MySQL高可用方案之DRBD+MySQL+RHCS(上)
MySQL作為最流行的資料庫,它的高可用方案也是多種多樣,其中用的比較多的是MHA+增強版半同步。但是客戶使用的是DRBD+RHCS的方案,通過各方尋找安裝資料,最終形成一個完整的安裝文件,供參考 一、DRBD介紹 1.1 DRBD基本功能 Distributed Replicated Block De
陶輝的專欄(聚焦分散式高可用系統。個人網站:www.taohui.pub)
高效能網路程式設計 詳細的從底層kernel提供的各種tcp程式設計介面,包括其使用場景和實現邏輯,到高層的執行緒、鎖、各種設計模式,再到更高層的軟體工層上的思考,細緻探討如何使伺服器的TCP程式設計能夠達到高併發、高TPS、高可
分散式高可用性ID伺服器設計實現
服務端/後臺開發中如何生成id是每個開發者都會遇到的問題,在電商、遊戲領域尤其突出。如何保證生成id的唯一性、可靠性、高可用性,如何組織id的格式,在不同的應用場景和限制下實現方式也不盡相同。 我們的應用場景類似電商,在一個訂單的生命週期內,有多個邏輯需要生成各自的id,還要考慮到可讀性和靈活性
Java資料庫連線池的實現(不用框架)
前言:因為需要正式做專案,瞭解到了連線池這個東西。在網上找了很多資料,發現都是大同小異,各種轉載,看上去搜出來了幾十個答案,結果很可能是同一個。並且大多都是基於框架的資料庫連線池。可是我只是想採用MVC做一個不是很大的專案,不需要使用到框架啊。這可怎麼辦,最後沒
MySQL 高可用性—keepalived+mysql雙主(有詳細步驟和全部配置項解釋)
MySQL 高可用性—keepalived+mysql雙主(有詳細步驟和全部配置項解釋)
多執行緒高併發程式設計(12) -- 阻塞演算法實現ArrayBlockingQueue原始碼分析(1)
一.前言 前文探究了非阻塞演算法的實現ConcurrentLinkedQueue安全佇列,也說明了阻塞演算法實現的兩種方式,使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現,今天來探究下ArrayBlocking