【原創】強擼基於 .NET 的 Redis Cluster 叢集訪問元件
Hello 大家好,我是TANZAME,我們又見面了。今天我們來聊聊怎麼手擼一個 Redis Cluster 叢集客戶端,純手工有乾貨,您細品。
隨著業務增長,線上環境的QPS暴增,自然而然將當前的單機 Redis 切換到群集模式。燃鵝,我們悲劇地發現,ServiceStack.Redis這個官方推薦的 .NET 客戶端並沒有支援叢集模式。一通度娘FQ無果後,決定自己強擼一個基於ServiceStack.Redis的Redis叢集訪問元件。
話不多說,先上執行效果圖:
Redis-Cluster叢集使用 hash slot 演算法對每個key計算CRC16值,然後對16383取模,可以獲取key對應的 hash slot。Redis-Cluster中每個master都會持有部分 slot,在訪問key時根據計算出來的hash slot去找到具體的master節點,再由當前找到的節點去執行具體的 Redis 命令(具體可查閱官方說明文件)。
由於 ServiceStack.Redis已經實現了單個例項的Redis命令,因此我們可以將即將要實現的 Redis 叢集客戶端當做一個代理,它只負責計算 key 落在哪一個具體節點(定址)然後將Redis命令轉發給對應的節點執行即可。
ServiceStack.Redis的RedisClient是非執行緒安全的,ServiceStack.Redis 使用快取客戶端管理器(PooledRedisClientManager)來提高效能和併發能力,我們的Redis Cluster叢集客戶端也應整合PooledRedisClientManager來獲取 RedisClient 例項。
同時,Redis-Cluster叢集支援線上動態擴容和slot遷移,我們的Redis叢集客戶端也應具備自動智慧發現新節點和自動重新整理 slot 分佈的能力。
總結起來,要實現一個Redis-Cluster客戶端,需要實現以下幾個要點:
- 根據 key 計算 hash slot
- 自動讀取群集上所有的節點資訊
- 為節點分配快取客戶端管理器
- 將 hash slot 路由到正確的節點
- 自動發現新節點和自動重新整理slot分佈
如下面類圖所示,接下來我們詳細分析具體的程式碼實現。
一、CRC16
CRC即迴圈冗餘校驗碼,是資訊系統中一種常見的檢錯碼。CRC校驗碼不同的機構有不同的標準,這裡Redis遵循的標準是CRC-16-CCITT標準,這也是被XMODEM協議使用的CRC標準,所以也常用XMODEM CRC代指,是比較經典的“基於位元組查表法的CRC校驗碼生成演算法”。
1 /// <summary> 2 /// 根據 key 計算對應的雜湊槽 3 /// </summary> 4 public static int GetSlot(string key) 5 { 6 key = CRC16.ExtractHashTag(key); 7 // optimization with modulo operator with power of 2 equivalent to getCRC16(key) % 16384 8 return GetCRC16(key) & (16384 - 1); 9 } 10 11 /// <summary> 12 /// 計算給定位元組組的 crc16 檢驗碼 13 /// </summary> 14 public static int GetCRC16(byte[] bytes, int s, int e) 15 { 16 int crc = 0x0000; 17 18 for (int i = s; i < e; i++) 19 { 20 crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >> 8) ^ (bytes[i] & 0xFF)) & 0xFF]); 21 } 22 return crc & 0xFFFF; 23 }
二、讀取叢集節點
從叢集中的任意節點使用 CLUSTER NODES 命令可以讀取到叢集中所有的節點資訊,包括連線狀態,它們的標誌,屬性和分配的槽等等。CLUSTER NODES 以序列格式提供所有這些資訊,輸出示例:
d99b65a25ef726c64c565901e345f98c496a1a47 127.0.0.1:7007 master - 0 1592288083308 8 connected 2d71879d6529d1edbfeed546443051986245c58e 127.0.0.1:7003 master - 0 1592288084311 11 connected 10923-16383 654cdc25a5fa11bd44b5b716cdf07d4ce176efcd 127.0.0.1:7005 slave 484e73948d8aacd8327bf90b89469b52bff464c5 0 1592288085313 10 connected ed65d52dad7ef6854e0e261433b56a551e5e11cb 127.0.0.1:7004 slave 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 0 1592288081304 9 connected 754d0ec7a7f5c7765f784a6a2c370ea38ea0c089 127.0.0.1:7001 master - 0 1592288080300 9 connected 0-5460 484e73948d8aacd8327bf90b89469b52bff464c5 127.0.0.1:7002 master - 0 1592288082306 10 connected 5461-10922 2223bc6d099bd9838e5d2f1fbd9a758f64c554c4 127.0.0.1:7006 myself,slave 2d71879d6529d1edbfeed546443051986245c58e 0 0 6 connected
每個欄位的含義如下:
1. id
:節點 ID,一個40個字元的隨機字串,當一個節點被建立時不會再發生變化(除非CLUSTER RESET HARD
被使用)。
2. ip:port
:客戶端應該聯絡節點以執行查詢的節點地址。
3. flags
:逗號列表分隔的標誌:myself
,master
,slave
,fail?
,fail
,handshake
,noaddr
,noflags
。標誌在下一節詳細解釋。
4. master
:如果節點是從屬節點,並且主節點已知,則節點ID為主節點,否則為“ - ”字元。
5. ping-sent
:以毫秒為單位的當前啟用的ping傳送的unix時間,如果沒有掛起的ping,則為零。
6. pong-recv
:毫秒 unix 時間收到最後一個乒乓球。
7. config-epoch
:當前節點(或當前主節點,如果該節點是從節點)的配置時期(或版本)。每次發生故障切換時,都會建立一個新的,唯一的,單調遞增的配置時期。如果多個節點聲稱服務於相同的雜湊槽,則具有較高配置時期的節點將獲勝。
8. link-state
:用於節點到節點叢集匯流排的鏈路狀態。我們使用此連結與節點進行通訊。可以是connected
或disconnected
。
9. slot
:雜湊槽號或範圍。從引數9開始,但總共可能有16384個條目(限制從未達到)。這是此節點提供的雜湊槽列表。如果條目僅僅是一個數字,則被解析為這樣。如果它是一個範圍,它是在形式start-end
,並且意味著節點負責所有雜湊時隙從start
到end
包括起始和結束值。
標誌的含義(欄位編號3):
myself
:您正在聯絡的節點。
master
:節點是主人。
slave
:節點是從屬的。
fail?
:節點處於PFAIL
狀態。對於正在聯絡的節點無法訪問,但仍然可以在邏輯上訪問(不處於FAIL
狀態)。
fail
:節點處於FAIL
狀態。對於將PFAIL
狀態提升為FAIL
的多個節點而言,這是無法訪問的。
handshake
:不受信任的節點,我們握手。
noaddr
:此節點沒有已知的地址。
noflags
:根本沒有標誌。
1 // 讀取叢集上的節點資訊 2 static IList<InternalClusterNode> ReadClusterNodes(IEnumerable<ClusterNode> source) 3 { 4 RedisClient c = null; 5 StringReader reader = null; 6 IList<InternalClusterNode> result = null; 7 8 int index = 0; 9 int rowCount = source.Count(); 10 11 foreach (var node in source) 12 { 13 try 14 { 15 // 從當前節點讀取REDIS叢集節點資訊 16 index += 1; 17 c = new RedisClient(node.Host, node.Port, node.Password); 18 RedisData data = c.RawCommand("CLUSTER".ToUtf8Bytes(), "NODES".ToUtf8Bytes()); 19 string info = Encoding.UTF8.GetString(data.Data); 20 21 // 將讀回的字元文字轉成強型別節點實體 22 reader = new StringReader(info); 23 string line = reader.ReadLine(); 24 while (line != null) 25 { 26 if (result == null) result = new List<InternalClusterNode>(); 27 InternalClusterNode n = InternalClusterNode.Parse(line); 28 n.Password = node.Password; 29 result.Add(n); 30 31 line = reader.ReadLine(); 32 } 33 34 // 只要任意一個節點拿到叢集資訊,直接退出 35 if (result != null && result.Count > 0) break; 36 } 37 catch (Exception ex) 38 { 39 // 出現異常,如果還沒到最後一個節點,則繼續使用下一下節點讀取叢集資訊 40 // 否則丟擲異常 41 if (index < rowCount) 42 Thread.Sleep(100); 43 else 44 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex); 45 } 46 finally 47 { 48 if (reader != null) reader.Dispose(); 49 if (c != null) c.Dispose(); 50 } 51 } 52 53 54 if (result == null) 55 result = new List<InternalClusterNode>(0); 56 return result; 57 } 58 59 /// <summary> 60 /// 從 cluster nodes 的每一行命令裡讀取出叢集節點的相關資訊 61 /// </summary> 62 /// <param name="line">叢集命令</param> 63 /// <returns></returns> 64 public static InternalClusterNode Parse(string line) 65 { 66 if (string.IsNullOrEmpty(line)) 67 throw new ArgumentException("line"); 68 69 InternalClusterNode node = new InternalClusterNode(); 70 node._nodeDescription = line; 71 string[] segs = line.Split(' '); 72 73 node.NodeId = segs[0]; 74 node.Host = segs[1].Split(':')[0]; 75 node.Port = int.Parse(segs[1].Split(':')[1]); 76 node.MasterNodeId = segs[3] == "-" ? null : segs[3]; 77 node.PingSent = long.Parse(segs[4]); 78 node.PongRecv = long.Parse(segs[5]); 79 node.ConfigEpoch = int.Parse(segs[6]); 80 node.LinkState = segs[7]; 81 82 string[] flags = segs[2].Split(','); 83 node.IsMater = flags[0] == MYSELF ? flags[1] == MASTER : flags[0] == MASTER; 84 node.IsSlave = !node.IsMater; 85 int start = 0; 86 if (flags[start] == MYSELF) 87 start = 1; 88 if (flags[start] == SLAVE || flags[start] == MASTER) 89 start += 1; 90 node.NodeFlag = string.Join(",", flags.Skip(start)); 91 92 if (segs.Length > 8) 93 { 94 string[] slots = segs[8].Split('-'); 95 node.Slot.Start = int.Parse(slots[0]); 96 if (slots.Length > 1) node.Slot.End = int.Parse(slots[1]); 97 98 for (int index = 9; index < segs.Length; index++) 99 { 100 if (node.RestSlots == null) 101 node.RestSlots = new List<HashSlot>(); 102 103 slots = segs[index].Split('-'); 104 105 int s1 = 0; 106 int s2 = 0; 107 bool b1 = int.TryParse(slots[0], out s1); 108 bool b2 = int.TryParse(slots[1], out s2); 109 if (!b1 || !b2) 110 continue; 111 else 112 node.RestSlots.Add(new HashSlot(s1, slots.Length > 1 ? new Nullable<int>(s2) : null)); 113 } 114 } 115 116 return node; 117 }View Code
三、為節點分配快取客戶端管理器
在單例項的Redis中,我們通過 PooledRedisClientManager 這個管理器來獲取RedisClient。借鑑這個思路,在Redis Cluster叢集中,我們為每一個主節點例項化一個 PooledRedisClientManager,並且該主節點持有的 slot 都共享一個 PooledRedisClientManager 例項。以 slot 做為 key 將 slot 與 PooledRedisClientManager 一一對映並快取起來。
1 // 初始化叢集管理 2 void Initialize(IList<InternalClusterNode> clusterNodes = null) 3 { 4 // 從 redis 讀取叢集資訊 5 IList<InternalClusterNode> nodes = clusterNodes == null ? RedisCluster.ReadClusterNodes(_source) : clusterNodes; 6 7 // 生成主節點,每個主節點的 slot 對應一個REDIS客戶端緩衝池管理器 8 IList<InternalClusterNode> masters = null; 9 IDictionary<int, PooledRedisClientManager> managers = null; 10 foreach (var n in nodes) 11 { 12 // 節點無效或者 13 if (!(n.IsMater && 14 !string.IsNullOrEmpty(n.Host) && 15 string.IsNullOrEmpty(n.NodeFlag) && 16 (string.IsNullOrEmpty(n.LinkState) || n.LinkState == InternalClusterNode.CONNECTED))) continue; 17 18 n.SlaveNodes = nodes.Where(x => x.MasterNodeId == n.NodeId); 19 if (masters == null) 20 masters = new List<InternalClusterNode>(); 21 masters.Add(n); 22 23 // 用每一個主節點的雜湊槽做鍵,匯入REDIS客戶端緩衝池管理器 24 // 然後,方法表指標(又名型別物件指標)上場,佔據 4 個位元組。 4 * 16384 / 1024 = 64KB 25 if (managers == null) 26 managers = new Dictionary<int, PooledRedisClientManager>(); 27 28 string[] writeHosts = new[] { n.HostString }; 29 string[] readHosts = n.SlaveNodes.Where(n => false).Select(n => n.HostString).ToArray(); 30 var pool = new PooledRedisClientManager(writeHosts, readHosts, _config); 31 managers.Add(n.Slot.Start, pool); 32 if (n.Slot.End != null) 33 { 34 // 這個範圍內的雜湊槽都用同一個緩衝池 35 for (int s = n.Slot.Start + 1; s <= n.Slot.End.Value; s++) 36 managers.Add(s, pool); 37 } 38 if (n.RestSlots != null) 39 { 40 foreach (var slot in n.RestSlots) 41 { 42 managers.Add(slot.Start, pool); 43 if (slot.End != null) 44 { 45 // 這個範圍內的雜湊槽都用同一個緩衝池 46 for (int s = slot.Start + 1; s <= slot.End.Value; s++) 47 managers.Add(s, pool); 48 } 49 } 50 } 51 } 52 53 _masters = masters; 54 _redisClientManagers = managers; 55 _clusterNodes = nodes != null ? nodes : null; 56 57 if (_masters == null) _masters = new List<InternalClusterNode>(0); 58 if (_clusterNodes == null) _clusterNodes = new List<InternalClusterNode>(0); 59 if (_redisClientManagers == null) _redisClientManagers = new Dictionary<int, PooledRedisClientManager>(0); 60 61 if (_masters.Count > 0) 62 _source = _masters.Select(n => new ClusterNode(n.Host, n.Port, n.Password)).ToList(); 63 }View Code
四、將 hash slot 路由到正確的節點
在訪問一個 key 時,根據第三步快取起來的 PooledRedisClientManager ,用 key 計算出來的 hash slot 值可以快速找出這個 key 對應的 PooledRedisClientManager 例項,呼叫 PooledRedisClientManager.GetClient() 即可將 hash slot 路由到正確的主節點。
1 // 執行指定動作並返回值 2 private T DoExecute<T>(string key, Func<RedisClient, T> action) => this.DoExecute(() => this.GetRedisClient(key), action); 3 4 // 執行指定動作並返回值 5 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1) 6 { 7 RedisClient c = null; 8 try 9 { 10 c = slot(); 11 return action(c); 12 } 13 catch (Exception ex) 14 { 15 // 此處省略 ... 16 } 17 finally 18 { 19 if (c != null) 20 c.Dispose(); 21 } 22 } 23 24 // 獲取指定key對應的主裝置節點 25 private RedisClient GetRedisClient(string key) 26 { 27 if (string.IsNullOrEmpty(key)) 28 throw new ArgumentNullException("key"); 29 30 int slot = CRC16.GetSlot(key); 31 if (!_redisClientManagers.ContainsKey(slot)) 32 throw new SlotNotFoundException(string.Format("No reachable node in cluster for slot {{{0}}}", slot), slot, key); 33 34 var pool = _redisClientManagers[slot]; 35 return (RedisClient)pool.GetClient(); 36 }
五、自動發現新節點和自動重新整理slot分佈
在實際生產環境中,Redis 叢集經常會有新增/刪除節點、遷移 slot 、主節點宕機從節點轉主節點等,針對這些情況,我們的 Redis Cluster 元件必須具備自動發現節點和重新整理在 第三步 快取起來的 slot 的能力。在這裡我的實現思路是當節點執行 Redis 命令時返回 RedisException 異常時就強制重新整理叢集節點資訊並重新快取 slot 與 節點之間的對映。
1 // 執行指定動作並返回值 2 private T DoExecute<T>(Func<RedisClient> slot, Func<RedisClient, T> action, int tryTimes = 1) 3 { 4 RedisClient c = null; 5 try 6 { 7 c = slot(); 8 return action(c); 9 } 10 catch (Exception ex) 11 { 12 if (!(ex is RedisException) || tryTimes == 0) throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex); 13 else 14 { 15 tryTimes -= 1; 16 // 嘗試重新重新整理叢集資訊 17 bool isRefresh = DiscoveryNodes(_source, _config); 18 if (isRefresh) 19 // 叢集節點有更新過,重新執行 20 return this.DoExecute(slot, action, tryTimes); 21 else 22 // 叢集節點未更新過,直接丟擲異常 23 throw new RedisClusterException(ex.Message, c != null ? c.GetHostString() : string.Empty, ex); 24 } 25 } 26 finally 27 { 28 if (c != null) 29 c.Dispose(); 30 } 31 } 32 33 // 重新重新整理叢集資訊 34 private bool DiscoveryNodes(IEnumerable<ClusterNode> source, RedisClientManagerConfig config) 35 { 36 bool lockTaken = false; 37 try 38 { 39 // noop 40 if (_isDiscoverying) { } 41 42 Monitor.Enter(_objLock, ref lockTaken); 43 44 _source = source; 45 _config = config; 46 _isDiscoverying = true; 47 48 // 跟上次同步時間相隔 {MONITORINTERVAL} 秒鐘以上才需要同步 49 if ((DateTime.Now - _lastDiscoveryTime).TotalMilliseconds >= MONITORINTERVAL) 50 { 51 bool isRefresh = false; 52 IList<InternalClusterNode> newNodes = RedisCluster.ReadClusterNodes(_source); 53 foreach (var node in newNodes) 54 { 55 var n = _clusterNodes.FirstOrDefault(x => x.HostString == node.HostString); 56 isRefresh = 57 n == null || // 新節點 58 n.Password != node.Password || // 密碼變了 59 n.IsMater != node.IsMater || // 主變從或者從變主 60 n.IsSlave != node.IsSlave || // 主變從或者從變主 61 n.NodeFlag != node.NodeFlag || // 節點標記位變了 62 n.LinkState != node.LinkState || // 節點狀態位變了 63 n.Slot.Start != node.Slot.Start || // 雜湊槽變了 64 n.Slot.End != node.Slot.End || // 雜湊槽變了 65 (n.RestSlots == null && node.RestSlots != null) || 66 (n.RestSlots != null && node.RestSlots == null); 67 if (!isRefresh && n.RestSlots != null && node.RestSlots != null) 68 { 69 var slots1 = n.RestSlots.OrderBy(x => x.Start).ToList(); 70 var slots2 = node.RestSlots.OrderBy(x => x.Start).ToList(); 71 for (int index = 0; index < slots1.Count; index++) 72 { 73 isRefresh = 74 slots1[index].Start != slots2[index].Start || // 雜湊槽變了 75 slots1[index].End != slots2[index].End; // 雜湊槽變了 76 if (isRefresh) break; 77 } 78 } 79 80 if (isRefresh) break; 81 } 82 83 if (isRefresh) 84 { 85 // 重新初始化叢集 86 this.Dispose(); 87 this.Initialize(newNodes); 88 this._lastDiscoveryTime = DateTime.Now; 89 } 90 } 91 92 // 最後重新整理時間在 {MONITORINTERVAL} 內,表示是最新群集資訊 newest 93 return (DateTime.Now - _lastDiscoveryTime).TotalMilliseconds < MONITORINTERVAL; 94 } 95 finally 96 { 97 if (lockTaken) 98 { 99 _isDiscoverying = false; 100 Monitor.Exit(_objLock); 101 } 102 } 103 }View Code
六、配置訪問元件呼叫入口
最後我們需要為元件提供訪問入口,我們用 RedisCluster 類實現 字串、列表、雜湊、集合、有序集合和Keys的基本操作,並且用 RedisClusterFactory 工廠類對外提供單例操作,這樣就可以像單例項 Redis 那樣呼叫 Redis Cluster 叢集。呼叫示例:
var node = new ClusterNode("127.0.0.1", 7001); var redisCluster = RedisClusterFactory.Configure(node, config); string key = "B070x14668"; redisCluster.Set(key, key); string value = redisCluster.Get<string>(key); redisCluster.Del(key);
1 /// <summary> 2 /// REDIS 叢集工廠 3 /// </summary> 4 public class RedisClusterFactory 5 { 6 static RedisClusterFactory _factory = new RedisClusterFactory(); 7 static RedisCluster _cluster = null; 8 9 /// <summary> 10 /// Redis 叢集 11 /// </summary> 12 public static RedisCluster Cluster 13 { 14 get 15 { 16 if (_cluster == null) 17 throw new Exception("You should call RedisClusterFactory.Configure to config cluster first."); 18 else 19 return _cluster; 20 } 21 } 22 23 /// <summary> 24 /// 初始化 <see cref="RedisClusterFactory"/> 類的新例項 25 /// </summary> 26 private RedisClusterFactory() 27 { 28 } 29 30 /// <summary> 31 /// 配置 REDIS 叢集 32 /// <para>若群集中有指定 password 的節點,必須使用 IEnumerable<ClusterNode> 過載列舉出這些節點</para> 33 /// </summary> 34 /// <param name="node">叢集節點</param> 35 /// <returns></returns> 36 public static RedisCluster Configure(ClusterNode node) 37 { 38 return RedisClusterFactory.Configure(node, null); 39 } 40 41 /// <summary> 42 /// 配置 REDIS 叢集 43 /// <para>若群集中有指定 password 的節點,必須使用 IEnumerable<ClusterNode> 過載列舉出這些節點</para> 44 /// </summary> 45 /// <param name="node">叢集節點</param> 46 /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param> 47 /// <returns></returns> 48 public static RedisCluster Configure(ClusterNode node, RedisClientManagerConfig config) 49 { 50 return RedisClusterFactory.Configure(new List<ClusterNode> { node }, config); 51 } 52 53 /// <summary> 54 /// 配置 REDIS 叢集 55 /// </summary> 56 /// <param name="nodes">叢集節點</param> 57 /// <param name="config"><see cref="RedisClientManagerConfig"/> 客戶端緩衝池配置</param> 58 /// <returns></returns> 59 public static RedisCluster Configure(IEnumerable<ClusterNode> nodes, RedisClientManagerConfig config) 60 { 61 if (nodes == null) 62 throw new ArgumentNullException("nodes"); 63 64 if (nodes == null || nodes.Count() == 0) 65 throw new ArgumentException("There is no nodes to configure cluster."); 66 67 if (_cluster == null) 68 { 69 lock (_factory) 70 { 71 if (_cluster == null) 72 { 73 RedisCluster c = new RedisCluster(nodes, config); 74 _cluster = c; 75 } 76 } 77 } 78 79 return _cluster; 80 } 81 }View Code
總結
今天我們詳細介紹瞭如何從0手寫一個Redis Cluster叢集客戶端訪問元件,相信對同樣在尋找類似解決方案的同學們會有一定的啟發,喜歡的同學請點個 star。在沒有相同案例可以參考的情況下筆者通過查閱官方說明文件和借鑑 Java 的 JedisCluster 的實現思路,雖說磕磕碰碰但最終也初步完成這個元件並投入使用,必須給自己加一個雞腿!!在此我有一個小小的疑問,.NET 的同學們在用 Redis 叢集時,你們是用什麼元件耍的,為何網上的相關介紹和現成元件幾乎都沒有?歡迎討論。
GitHub 程式碼託管:https://github.com/TANZAME/ServiceStack.Redis.Cluster
技術交流 QQ 群:816425449