1. 程式人生 > >Ring實現原理剖析

Ring實現原理剖析

簡介

OpenStack是一個美國國家航空航天局和Rackspace合作研發的開源雲端計算專案,併成為Apache下的一個重要開源專案,目前已經發展到了180家公司參與其中。

OpenStack Object Storage(SwiftOpenStack開源雲端計算專案的子專案之一。Swift的目的是使用普通硬體來構建冗餘的、可擴充套件的分散式物件儲存叢集,儲存容量可達PB級。OpenStack Object Storage 最初由 Rackspace 採用Python語言開發,並於 2010 年 7 月貢獻給 OpenStack ,作為該開源專案的一部分。它的目的是用於託管 Rackspace的 Cloud Files service ,原始專案代號是 swift,所以沿用至今。

在分散式物件儲存中的一個關鍵問題是資料該如何存放。Ring是Swift中最重要的元件,用於記錄儲存物件與物理位置間對映關係。在涉及查詢account、container、object資訊時就需要查詢叢集的ring資訊。

先來看一下Swift文件中關於Ring的描述:

Ring用來確定資料駐留在叢集中的位置。有單獨對應於Account資料庫、container資料庫和單個object的ring。

Ring中每個partition在叢集中都(預設)有3個replica。每個partition的位置由ring來維護,並存儲在對映中。

Ring使用zone的概念來保證資料的隔離。每個partition的replica都確保放在了不同的zone中。一個zone可以是一個硬碟,一個伺服器,一個機架,一個交換機,甚至是一個數據中心............

.......

在上述Ring的特性描述中提到了Ring使用zone、device、partition和replica等等來維護資料和磁碟間的對映資訊。那麼在Ring的背後採用什麼演算法,使用了什麼機制來保證資料的安全、高效和可擴充套件呢?這些概念對於資料儲存帶來了什麼好處?本文逐步深入探討了Swift如何通過Ring元件來實現冗餘的、可擴充套件的目的。

1.      普通Hash演算法與場景分析

先來看一個簡單的例子假設我們手裡有N臺儲存伺服器(以下簡稱node),打算用於圖片檔案儲存,為了使伺服器的負載均衡,需要把物件均勻地對映到每臺伺服器上,通常會使用雜湊演算法來實現,計算步驟如下:

1.計算object的hash值Key

2.計算Key mod N值

N個儲存節點,將Key模N得到的餘數就是該Key對應的值需要存放的節點。比如,N是2,那麼值為0、1、2、3、4的Key需要分別存放在0、1、0、1和0號節點上。如果雜湊演算法是均勻的,資料就會被平均分配到兩個節點中。如果每個資料的訪問量比較平均,負載也會被平均分配到兩個節點上。

但是,當資料量和訪問量進一步增加,兩個節點無法滿足需求的時候,需要增加一個節點來服務客戶端的請求。這時,N變成了3,對映關係變成了Key mod (N+1),因此,上述雜湊值為2、3、4的資料需要重新分配(2->server 2,3 -> server 0,4 -> server 1)。如果資料量很大的話,那麼資料量的遷移工作將會非常大。當N已經很大,從N加入一個節點變成N+1個節點的過程,會導致整個雜湊環的重新分配,這個過程幾乎是無法容忍的,幾乎全部的資料都要重新移動一遍。

       我們舉例說明,假設有100個node的叢集,將107項資料使用md5 hash演算法分配到每個node中,Python程式碼如下:

from hashlib import md5
from struct import unpack_from

NODE_COUNT = 
100
DATA_ID_COUNT = 10000000

node_counts = [0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
    data_id = str(data_id)
    
# This just pulls part of the hash out as an integer
    hsh = unpack_from('>I', md5(data_id).digest())[0]
    node_id = hsh % NODE_COUNT
    node_counts[node_id] += 
1
desired_count = DATA_ID_COUNT / NODE_COUNT
print'%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 
100.0 * (max_count - desired_count) / desired_count
print'%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 
100.0 * (desired_count - min_count) / desired_count
print'%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)


100000: Desired data ids per node
100695: Most data ids on one node, 0.69% over
99073: Least data ids on one node, 0.93% under

分佈結果如下所示:

名稱

資料項數量

百分比值

資料項均值

100000

0%

最多資料項節點

100695

+0.69%

最少資料項節點

99073

-0.93%

從資料分佈上來看擁有最多/最少資料項的節點沒有超出平均值的1%。現在假設增加一個節點提供負載能力,不過得重新分配資料項到新的節點上,程式碼如下:

from hashlib import md5
from struct import unpack_from  


NODE_COUNT = 100
NEW_NODE_COUNT = 101
DATA_ID_COUNT = 10000000

moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from(
'>I', md5(str(data_id)).digest())[0]
    node_id = hsh % NODE_COUNT
    new_node_id = hsh % NEW_NODE_COUNT
    
if node_id != new_node_id:
        moved_ids += 
1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print'%d ids moved, %.02f%%' % (moved_ids, percent_moved)

9900989 ids moved, 99.01%

通過計算我們發現,為了提高叢集1%的儲存能力,我們需要移動9900989個數據項,也就是99.01%的資料項!顯然,這種演算法嚴重地影響了系統的效能和可擴充套件性。

clip_image002[4]增加1%的儲存能力=移動99%的資料?

這種虧本生意顯然做不得,那麼怎麼辦呢?一致性雜湊演算法就是為了解決這個問題而來。

2.一致性雜湊演算法

一致性雜湊演算法是由D. Darger、E. Lehman和T. Leighton 等人於1997年在論文Consistent Hashing and Random Trees:Distributed Caching Protocols for Relieving Hot Spots On the World Wide Web首次提出,目的主要是為了解決分散式網路中的熱點問題。在其論文中,提出了一致性雜湊演算法並給出了衡量一個雜湊演算法的4個指標:

平衡性(Balance)

平衡性是指Hash的結果能夠儘可能分佈均勻,充分利用所有快取空間。

單調性(Monotonicity)

單調性是指如果已經有一些內容通過雜湊分派到了相應的緩衝中,又有新的緩衝加入到系統中。雜湊的結果應能夠保證原有已分配的內容可以被對映到新的緩衝中去,而不會被對映到舊的緩衝集合中的其他緩衝區。

分散性(Spread)

分散性定義了分散式環境中,不同終端通過Hash過程將內容對映至快取上時,因可見快取不同,Hash結果不一致,相同的內容被對映至不同的緩衝區。

負載(Load)

負載是對分散性要求的另一個緯度。既然不同的終端可以將相同的內容對映到不同的緩衝區中,那麼對於一個特定的緩衝區而言,也可能被不同的使用者對映為不同的內容。

Swift使用該演算法的主要目的是在改變叢集的node數量時(增加/刪除伺服器),能夠儘可能少地改變已存在key和node的對映關係,以滿足單調性。一致性雜湊一般兩種思路:

1.遷移為主要特點(swift初期採用)

2.引入虛結點,減少移動為特點(swift現採用)

具體步驟如下:

1.    首先求出每個節點(機器名或者是IP地址)的雜湊值,並將其分配到一個圓環區間上(這裡取0-2^32)。

2.    求出需要儲存物件的雜湊值,也將其分配到這個圓環上。

       3.    從物件對映到的位置開始順時針查詢,將物件儲存到找到的第一個節點上。

其中這個從雜湊到位置對映的圓環,我們就可以理解為何使用術語“Ring”來表示了。雜湊環空間上的分佈如圖1所示:

clip_image004[4]

1 雜湊環空間

假設在這個環形雜湊空間中,Cache5被對映在Cache3和Cache4之間,那麼受影響的將僅是沿Cache5逆時針遍歷直到下一個Cache(Cache3)之間的物件(它們本來對映到Cache4上)。

clip_image006[4]

2 一致性雜湊演算法的資料移動

現在,使用該演算法在叢集中增加一個node,同時要保證每個節點的資料項數量均衡,程式碼如下所示,其中node_range_starts表示每個node的資料項的開始位置。

from bisect import bisect_left
from hashlib import md5
from struct import unpack_from 

NODE_COUNT = 
100
NEW_NODE_COUNT = 101
DATA_ID_COUNT = 10000000

node_range_starts = []
for node_id in xrange(NODE_COUNT):
    node_range_starts.append(DATA_ID_COUNT /
                             NODE_COUNT * node_id)
new_node_range_starts = []

for new_node_id in xrange(NEW_NODE_COUNT):
    new_node_range_starts.append(DATA_ID_COUNT /
                              NEW_NODE_COUNT * new_node_id)
moved_ids = 
0
for data_id in xrange(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from(
'>I', md5(str(data_id)).digest())[0]
    node_id = bisect_left(node_range_starts,
                          hsh % DATA_ID_COUNT) % NODE_COUNT
    new_node_id = bisect_left(new_node_range_starts,
                          hsh % DATA_ID_COUNT) % NEW_NODE_COUNT
    
if node_id != new_node_id:
        moved_ids += 
1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print'%d ids moved, %.02f%%' % (moved_ids, percent_moved)

4901707 ids moved, 49.02%

結果雖然比之前好了些,但是提高1%的效能與移動50%的資料仍不理想。

clip_image008[4]

增加1%能力=移動50%資料?

引入虛擬節點(Partition)

考慮到雜湊演算法在node較少的情況下,改變node數會帶來巨大的資料遷移。為了解決這種情況,一致性雜湊引入了“虛擬節點”的概念: “虛擬節點”是實際節點在環形空間的複製品,一個實際節點對應了若干個“虛擬節點”,“虛擬節點”在雜湊空間中以雜湊值排列。

clip_image010[4]

3 虛擬節點

引入了“虛擬節點”後,對映關係就從【object--->node】轉換成了【object--->virtual node---> node】。查詢object所在node的對映關係如下圖所示。

clip_image012[4]

4 物件、虛結點、節點間的對映關係

對100個node細分為1000個vnode,使用vnode_range_starts來指定vnode的開始範圍,vnode2node表示vnode到node的指派,然後增加一個node,完成vnode的重新分配,並計算所移動的資料項:

from bisect import bisect_left
from hashlib import md5
from struct import unpack_from

NODE_COUNT = 
100
DATA_ID_COUNT = 10000000
VNODE_COUNT = 1000

vnode_range_starts = []
vnode2node = []

for vnode_id in xrange(VNODE_COUNT):
    vnode_range_starts.append(DATA_ID_COUNT /
                              VNODE_COUNT * vnode_id)
    vnode2node.append(vnode_id % NODE_COUNT)
new_vnode2node = list(vnode2node)
new_node_id = NODE_COUNT
NEW_NODE_COUNT = NODE_COUNT + 
1
vnodes_to_reassign = VNODE_COUNT / NEW_NODE_COUNT
while vnodes_to_reassign > 0:
    
for node_to_take_from in xrange(NODE_COUNT):
        
for vnode_id, node_id in enumerate(new_vnode2node):
            
if node_id == node_to_take_from:
                new_vnode2node[vnode_id] = new_node_id
                vnodes_to_reassign -= 
1
if vnodes_to_reassign <= 0:
                    
break
if vnodes_to_reassign <= 0:
            
break
moved_ids = 0
for data_id in xrange(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from(
'>I', md5(str(data_id)).digest())[0]
    vnode_id = bisect_left(vnode_range_starts,
                         hsh % DATA_ID_COUNT) % VNODE_COUNT
    node_id = vnode2node[vnode_id]
    new_node_id = new_vnode2node[vnode_id]
    
if node_id != new_node_id:
        moved_ids += 
1
percent_moved = 100.0 * moved_ids / DATA_ID_COUNT
print'%d ids moved, %.02f%%' % (moved_ids, percent_moved)

90108 ids moved, 0.90%

結果顯示,僅移動了0.9%的資料。與前面相比,整個叢集的效能大大提高了。

clip_image014[4]增加1%的能力=移動0.9%資料

  固化虛節點到資料項的對映

由於虛節點個數在叢集的整個生命週期中是不會變化的,它與資料項的對映關係不會發生變化,改變的僅是vnode與node的對映關係,所以需對以上程式碼進行優化。

from struct import unpack_from
from hashlib import md5
from time import time

NODE_COUNT = 
100
DATA_ID_COUNT = 10000000
VNODE_COUNT = 1000

begin = time()
vnode2node = []

for vnode_id in xrange(VNODE_COUNT):
    vnode2node.append(vnode_id % NODE_COUNT)
new_vnode2node = list(vnode2node)
new_node_id = NODE_COUNT
vnodes_to_assign = VNODE_COUNT / (NODE_COUNT + 
1)
while vnodes_to_assign > 0:
    
for node_to_take_from in xrange(NODE_COUNT):
        
for vnode_id, node_id in enumerate(vnode2node):
            
if node_id == node_to_take_from:
                vnode2node[vnode_id] = new_node_id
                vnodes_to_assign -= 
1
if vnodes_to_assign <= 0:
                    
break
if vnodes_to_assign <= 0:
            
break
moved_id = 0
for data_id in xrange(DATA_ID_COUNT):
    data_id = str(data_id)
    hsh = unpack_from(
'>I', md5(str(data_id)).digest())[0]
    vnode_id = hsh % VNODE_COUNT
#(1)
    node_id = vnode2node[vnode_id]
    new_node_id = new_vnode2node[vnode_id]
    
if node_id != new_node_id:
        moved_id += 
1
percent_moved = 100.0 * moved_id / DATA_ID_COUNT
print'%d ids moved, %.02f%%' % (moved_id, percent_moved)
print'%d seconds pass ...' % (time() - begin)

90108 ids moved, 0.90%

  預設合理的虛結點數

現在已構建好了一致性雜湊ring的原型。但是存在一個問題,以上例子中,1000個虛結點對應著100個結點,結點變動時,虛結點就需要重新分配到結點。當100個結點擴充套件到1001個結點時,此時至少有一個結點分配不到虛結點,那麼就需要再增加虛結點數,而虛結點是與資料項對應的雜湊關係,如果改變了虛節點數,那麼就需要重新分配所有的資料項,這將導致移動大量的資料。

所以在設定虛結點數的時候,需要對系統預期的規模做充分考慮,假如叢集的規模不會超過6000個結點,那麼可以將虛結點數設定為結點數的100倍。這樣,變動任意一個結點的負載僅影響1%的資料項。此時有6百萬個vnode數,使用2bytes來儲存結點數(0~65535)。基本的記憶體佔用是6*106*2bytes=12Mb,對於伺服器來說完全可以承受。

在此,引入了2個概念:

在swift中,為了區分vnode和node,將vnode稱為partition

  位操作代替取模操作

此外,在計算機中使用位操作來確定partition的位置比取模更快。所以,在此引入了partition power的概念。

繼續改進ring的程式碼,設有65536個node(2^16),有128(2^7)倍個partition數(2^23)。由於MD5碼是32位的,使用PARTITION_SHIFT(等於32-PARTITION_POWER)將資料項的MD5雜湊值對映到partition的2^23的空間中。

from array import array
from hashlib import md5
from struct import unpack_from

PARTITION_POWER = 
23
PARTITION_SHIFT = 32 - PARTITION_POWER
NODE_COUNT = 
65536
DATA_ID_COUNT = 100000000

part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
    part2node.append(part % NODE_COUNT)
node_counts = [
0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
    data_id = str(data_id)
    part = unpack_from(
'>I',
        md5(str(data_id)).digest())[
0] >> PARTITION_SHIFT
    node_id = part2node[part]
    node_counts[node_id] += 
1
desired_count = DATA_ID_COUNT / NODE_COUNT
print'%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 
100.0 * (max_count - desired_count) / desired_count
print'%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 
100.0 * (desired_count - min_count) / desired_count
print'%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)


1525: Desired data ids per node
1683: Most data ids on one node, 10.36% over
1360: Least data ids on one node, 10.82% under

資料不均衡的原因在於資料項相對於partition數太小了(10^8對2^23),若資料項越多,分佈越均衡。

保證資料安全,引入replica

到目前為止,在叢集中的資料在本地節點上只有一份,節點一旦發生故障就可能會造成資料的永久性丟失。因此,Swift中引入replica的概念使用冗餘副本來保證資料的安全。replica的預設值為3,其理論依據主要來源於NWR策略。

NWR是一種在分散式儲存系統中用於控制一致性級別的一種策略。在Amazon的Dynamo雲端儲存系統中,就應用NWR來控制一致性。每個字母的涵義如下:

N:同一份資料的Replica的份數
W:是更新一個數據物件的時候需要確保成功更新的份數
R讀取一個數據需要讀取的Replica的份數

在分散式系統中,資料的單點是不允許存在的。即線上正常存在的Replica數量是1的情況是非常危險的,因為一旦這個Replica再次錯誤,就可能發生資料的永久性錯誤。假如我們把N設定成為2,那麼,只要有一個儲存節點發生損壞,就會有單點的存在。所以N必須大於2。N約高,系統的維護和整體成本就越高。工業界通常把N設定為3。

因此,在ring的程式碼中引入replica,數量設定為3,其中 node_ids記錄的是3個replica存放的node id。part2node[part]是根據partition id 找到對應的node id。

from array import array
from hashlib import md5
from struct import unpack_from

REPLICAS = 
3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 
2 ** PARTITION_POWER - 1
NODE_COUNT = 256
DATA_ID_COUNT = 10000000

part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
    part2node.append(part % NODE_COUNT)
node_counts = [
0] * NODE_COUNT
for data_id in xrange(DATA_ID_COUNT):
    data_id = str(data_id)
    part = unpack_from(
'>I',
        md5(str(data_id)).digest())[
0] >> PARTITION_SHIFT
    node_ids = [part2node[part]]
    node_counts[node_ids[
0]] += 1
for replica in xrange(1, REPLICAS):
        
while part2node[part] in node_ids:
            part += 
1
if part > PARTITION_MAX:
                part = 
0
        node_ids.append(part2node[part])
        node_counts[node_ids[-
1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print'%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 
100.0 * (max_count - desired_count) / desired_count
print'%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 
100.0 * (desired_count - min_count) / desired_count
print'%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)


117186: Desired data ids per node
118133: Most data ids on one node, 0.81% over
116093: Least data ids on one node, 0.93% under

    結果如上,由於使用了256個node,分佈約有1%的波動,比較均勻了。

但是存在有2個問題:

  隨機分配對映

首先part2node是基於順序分配的,對於給定的node,它所有partition的copies均在另兩個node上,若某個node頻繁宕機,與它相應的兩個node上的資料項需要頻繁複制。解決方法是隨機分配partition到node的對映。

  分割槽容忍性和引入zone

其次是當前的叢集不滿足CAP原理中的分割槽容忍性(Partition Tolerance)。Gilbert 和Lynch分割槽容忍性定義如下:

No set of failures less than total network failure is allowed to cause the system to respond incorrectly

翻譯一下,就是除了全部網路節點發生故障以外,所有子節點集合的故障都不允許導致整個系統的響應故障。

現在為止,這些node都在一個機架上,一旦發生斷電,網路故障,那麼將喪失這一性質。因此就需要一種機制對機器的物理位置進行隔離。所以引入了zone的概念。

在ring程式碼中引入zone_count,把這些node分割到16個zone中去。其中partition的replica不能放在同一個node上或同一個zone內。

from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from

REPLICAS = 
3
PARTITION_POWER = 16
PARTITION_SHIFT = 32 - PARTITION_POWER
PARTITION_MAX = 
2 ** PARTITION_POWER - 1
NODE_COUNT = 256
ZONE_COUNT = 16
DATA_ID_COUNT = 10000000

node2zone = []
while len(node2zone) < NODE_COUNT:
    zone = 
0
while zone < ZONE_COUNT and len(node2zone) < NODE_COUNT:
        node2zone.append(zone)
        zone += 
1
part2node = array('H')
for part in xrange(2 ** PARTITION_POWER):
    part2node.append(part % NODE_COUNT)
shuffle(part2node)
node_counts = [
0] * NODE_COUNT
zone_counts = [
0] * ZONE_COUNT
for data_id in xrange(DATA_ID_COUNT):
    data_id = str(data_id)
    part = unpack_from(
'>I',
        md5(str(data_id)).digest())[
0] >> PARTITION_SHIFT
    node_ids = [part2node[part]]
    zones = [node2zone[node_ids[
0]]]
    node_counts[node_ids[
0]] += 1
    zone_counts[zones[0]] += 1
for replica in xrange(1, REPLICAS):
        
while part2node[part] in node_ids and \
                node2zone[part2node[part]] in zones:
            part += 
1
if part > PARTITION_MAX:
                part = 
0
        node_ids.append(part2node[part])
        zones.append(node2zone[node_ids[-
1]])
        node_counts[node_ids[-
1]] += 1
        zone_counts[zones[-1]] += 1
desired_count = DATA_ID_COUNT / NODE_COUNT * REPLICAS
print'%d: Desired data ids per node' % desired_count
max_count = max(node_counts)
over = 
100.0 * (max_count - desired_count) / desired_count
print'%d: Most data ids on one node, %.02f%% over' % \
    (max_count, over)
min_count = min(node_counts)
under = 
100.0 * (desired_count - min_count) / desired_count
print'%d: Least data ids on one node, %.02f%% under' % \
    (min_count, under)
desired_count = DATA_ID_COUNT / ZONE_COUNT * REPLICAS

print'%d: Desired data ids per zone' % desired_count
max_count = max(zone_counts)
over = 
100.0 * (max_count - desired_count) / desired_count
print'%d: Most data ids in one zone, %.02f%% over' % \
    (max_count, over)
min_count = min(zone_counts)
under = 
100.0 * (desired_count - min_count) / desired_count
print'%d: Least data ids in one zone, %.02f%% under' % \
    (min_count, under)


117186: Desired data ids per node
118782: Most data ids on one node, 1.36% over
115632: Least data ids on one node, 1.33% under
1875000: Desired data ids per zone
1878533: Most data ids in one zone, 0.19% over
1869070: Least data ids in one zone, 0.32% under

到目前為止,ring的基本功能都已經有了:一致性雜湊ring、partition、partition power、replica、zone。目前還差weight以及將以上程式碼改寫為類封裝成module。

weight

引入weight的概念,目的是“能者多勞”:解決未來新增儲存能力更大的node時,使得可以分配到更多的partition。例如,2T 容量的node的partition數為1T的兩倍。

在ring的構建中,加入了weight屬性。本例中weight簡單地取1和2兩個值,根據每個結點在weight和中的比例分配所需partition數。

from array import array
from hashlib import md5
from random import shuffle
from struct import unpack_from
from time import time


class Ring(object):

    
def__init__(self, nodes, part2node, replicas):
        self.nodes = nodes
        self.part2node = part2node
        self.replicas = replicas
        partition_power = 
1
while2 ** partition_power < len(part2node):
            partition_power += 
1
if len(part2node) != 2 ** partition_power:
            
raise Exception("part2node's length is not an "
"exact power of 2")
        self.partition_shift = 
32 - partition_power

    
defget_nodes(self, data_id):
        data_id = str(data_id)
        part = unpack_from(
'>I',
           md5(data_id).digest())[
0] >> self.partition_shift
        node_ids = [self.part2node[part]]
        zones = [self.nodes[node_ids[
0]]]
        
for replica in xrange(1, self.replicas):
            
while self.part2node[part] in node_ids and \
                   self.nodes[self.part2node[part]] in zones:
                part += 
1
if part >= len(self.part2node):
                    part = 
0
            node_ids.append(self.part2node[part])
            zones.append(self.nodes[node_ids[-
1]])
        
return [self.nodes[n] for n in node_ids]

defbuild_ring(nodes, partition_power, replicas):
    begin = time()
    parts = 
2 ** partition_power
    total_weight = \
        float(sum(n[
'weight'for n in nodes.itervalues()))
    
for node in nodes.itervalues():
        node[
'desired_parts'] = \
            parts / total_weight * node[
'weight']
    part2node = array(
'H')
  
for part in xrange(2 ** partition_power):
        
for node in nodes.itervalues():
            
if node['desired_parts'] >= 1:
                node[
'desired_parts'] -= 1
                part2node.append(node['id'])
                
break
else:
            
for node in nodes.itervalues():
                
if node['desired_parts'] >= 0:
                    node[
'desired_parts'] -= 1
                    part2node.append(node['id'])
                    
break
    shuffle(part2node)
    ring = Ring(nodes, part2node, replicas)
    
print'%.02fs to build ring' % (time() - begin)
    
return ring

deftest_ring(ring):
    begin = time()
    DATA_ID_COUNT = 
10000000
    node_counts = {}
    zone_counts = {}