python codis叢集客戶端(一) - 基於客戶端daemon探活與服務列表維護
在使用codis時候,我們遇到的場景是,公司提供了HA的Proxy(例如N個),但是不暴露zookeeper(也就是說沒有codis後端服務列表)。
如果暴露zk的話,可以看這一篇
要求在開發客戶端api的過程中,自己進行探活&故障摘除&負載均衡。
我這裡做了一個簡單的實現,提供給大家參考。本例項支援使用在server或者daemon中。
我們的實現叫做pycodis。
1、核心檔案CodisPool.py
在這個檔案裡,實現了多服務端例項連結,探活與故障摘除,負載均衡策略(隨機&roundrobin)。
1)探活
由於公司不暴露CodisProxy的zookeeper,我們在編寫客戶端程式的時候無法獲取活躍的codisProxy服務列表。我們不得不自己去探活並且儲存活躍的服務列表。
我們通過在初始化CodisPool例項時候,啟動了後臺執行緒_check_available_backgroud,去輪詢配置的codisProxy列表,我們的探活與摘除滿足以下3個要求:
a)如果在一定的閾值時間段內(預設3s),某個codisProxy始終無法提供服務,就暫時將它從codisProxy列表中摘除;
b)被摘除的codisProxy例項無法通過get_connection得到,但是在codisProxy列表中保留它,讓它可以在滿足條件的情況下復起;
c)當被摘除的codisProxy恢復了,就把它放到可用codisProxy列表中,這樣通過get_connetion又能拿到它。
注:這裡通過get_connection拿到的proxy,其實現方式是redis連結池。
2)負載均衡
我們通過get_connection這個函式在proxy列表中得到一個可用連結,那麼獲取可用連結的負載均衡演算法是怎樣的呢?
PickUp抽象類,定義了負載均衡類需要實現的方法,pick_up()。
我們實現了兩種負載均衡演算法:
a)RandomPickUp,隨機負載均衡
b)RoundRobinPickUp,輪詢負載均衡
3)單例模式的保證
為了不至於在程式執行時建立很多的連結池例項,這是違反設計初衷的,連結池就沒有意義了,我們需要實現為執行緒安全的單例模式。
在CodisPool實現中只保證了不會額外建立太多的連結池--我們在__init__中一次性建立好了Podis例項,並且放入到proxy列表中。單例模式留待使用中去實現,最簡單的方式是,在python的module中先把CodisPool的例項建立好。然後其他的執行緒在去訪問。可以參考我們的example例項。
# -*- coding:utf-8 -*-
import abc
import uuid
import time
import logging
import traceback
import threading
import redis
from Podis import Podis
logger = logging.getLogger(__name__)
class CodisPool(object):
def __init__(self, codis_config):
self._pool_shards = []
self._availables = []
self._connection = None
self._create_pool(codis_config)
self._check_available_backgroud()
def _check_available_backgroud(self):
bg = Background(self._check_pool_shards)
bg.start()
def _create_pool(self, codis_config):
address_list = codis_config.get('addrs').split(',')
for address in address_list:
host = address.split(':')[0]
port = address.split(':')[1]
self._pool_shards.append(
Podis(
redis.ConnectionPool(
host=host, port=port, db=0,
password=codis_config.get('password'),
max_connections=codis_config.get('max_connections')
)
)
)
self._availables.append(True)
if len(self._pool_shards) == 0:
logger.error('建立codis連結池失敗')
raise Exception('建立codis連結池失敗')
def _check_pool_shards(self):
while True:
self._pool_shards_is_available()
def _pool_shards_is_available(self, retry_num=3):
i = 0
for pool in self._pool_shards:
try:
retry = retry_num
go_on = True
while go_on and retry > 0:
try:
pong = pool.ping()
if not pong:
retry -= 1
else:
go_on = False
except Exception, ex:
retry -= 1
raise
finally:
time.sleep(1)
if retry <= 0:
self._availables[i] = False
else:
self._availables[i] = True
except Exception, ex:
logger.error(traceback.format_exc())
finally:
i += 1
def _get_available_shards(self):
i = 0
available_shards = []
for shard in self._pool_shards:
if self._availables[i]:
available_shards.append(shard)
i += 1
return available_shards
def get_connection(self, pick_up=None):
if isinstance(pick_up, PickUp):
codisPool = pick_up.pick_up(self._get_available_shards())
else:
pick_up = RandomPickUp()
codisPool = pick_up.pick_up(self._get_available_shards())
return codisPool
def get_pool_shards(self):
return self._pool_shards
def get_availables(self):
return self._get_available_shards()
class Background(object):
def __init__(self, target, daemon=True):
self.daemon = daemon
self.thread = threading.Thread(target=target)
def start(self):
self.thread.setDaemon(self.daemon)
self.thread.start()
class PickUp(object):
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self):
pass
@abc.abstractmethod
def pick_up(self, pool_list):
return
class RandomPickUp(PickUp):
def __init__(self):
PickUp.__init__(self)
def pick_up(self, pool_list):
pool_size = len(pool_list)
index = abs(hash(uuid.uuid4())) % pool_size
pool = pool_list[index]
print "RandomPickUp, 拿到第", index, "個pool"
return pool
class RoundRobinPickUp(PickUp):
def __init__(self):
PickUp.__init__(self)
self.index = 0
self.round_robin_lock = threading.Lock()
def pick_up(self, pool_list):
with self.round_robin_lock:
pool_size = len(pool_list)
self.index += 1
index = abs(self.index) % pool_size
pool = pool_list[index]
print "RoundRobinPickUp, 拿到第", index, "個pool"
return pool
2、Podis,實際的控制代碼資源
在CodisPool獲得的控制代碼就是本類的一個例項。
# -*- coding:utf-8 -*-
import redis
import logging
import traceback
logger = logging.getLogger(__name__)
def redis_getter(func):
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return result or None
except Exception, ex:
logger.error(traceback.format_exc())
raise
return wrapper
def redis_setter(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
return True
except Exception, ex:
logger.error(traceback.format_exc())
raise
return wrapper
class Podis(object):
def __init__(self, pool):
self._connection = redis.StrictRedis(connection_pool=pool)
@redis_getter
def ping(self):
return self._connection.ping()
@redis_getter
def get(self, key):
return self._connection.get(key)
@redis_setter
def set(self, key, value):
self._connection.set(key, value)
@redis_setter
def lpush(self, key, *value):
self._connection.lpush(key, *value)
@redis_getter
def lpop(self, key):
return self._connection.lpop(key)
@redis_getter
def lrange(self, key, start, end):
return self._connection.lrange(key, start, end)
@redis_setter
def sadd(self, key, *value):
self._connection.sadd(key, *value)
@redis_setter
def srem(self, key, *value):
self._connection.srem(key, *value)
@redis_getter
def zrange(self,key,start,end):
return self._connection.zrange(key,start,end)
@redis_getter
def zrevrange(self,key,start,end):
return self._connection.zrevrange(key,start,end,withscores=True)
@redis_getter
def zscore(self,key,*value):
return self._connection.zscore(key,value)
@redis_setter
def zadd(self,key,score,*value):
self._connection.zadd(key,score,value)
@redis_getter
def smembers(self, key):
return self._connection.smembers(key)
@redis_getter
def hgetall(self, key):
return self._connection.hgetall(key)
@redis_getter
def hget(self, key, name):
return self._connection.hget(key, name)
@redis_getter
def hkeys(self, key):
return self._connection.hkeys(key)
@redis_setter
def hset(self, key, name, value):
self._connection.hset(key, name, value)
@redis_setter
def hmset(self, name, mapping):
self._connection.hmset(name, mapping)
@redis_setter
def hdel(self, key, name):
self._connection.hdel(key, name)
@redis_setter
def delete(self, *key):
self._connection.delete(*key)
# codis不支援
@redis_getter
def keys(self, pattern):
return self._connection.keys(pattern)
@redis_setter
def expire(self, key, time):
return self._connection.expire(key, time)
@redis_getter
def ttl(self, key):
return self._connection.ttl(key)
3、配置檔案CodisConfig.py
這裡我沒有對最大連線數,超時時間等等做配置,你可以根據你的場景自行新增。
codis_config = {
'addrs': '100.90.186.47:3000,100.90.187.33:3000'
}
4、單測和使用
我們模擬在併發場景下,對資源的獲得和釋放情況。
import time
import threading
from pycodis.CodisConfig import codis_config
from pycodis.CodisPool import CodisPool, RoundRobinPickUp
codis_pool1 = CodisPool(codis_config)
print '------1-------'
pick_up1 = RoundRobinPickUp()
print '------2-------'
codis_pool2 = CodisPool(codis_config)
print '------3-------'
pick_up2 = RoundRobinPickUp()
print '------4-------'
def func(i):
for i in range(10):
podis1 = codis_pool1.get_connection(pick_up=pick_up1)
podis2 = codis_pool2.get_connection(pick_up=pick_up2)
podis1.delete(i)
podis2.delete(i)
time.sleep(1)
thread_list = []
for i in range(100):
thread_list.append(threading.Thread(target=func, args=[i]))
for thread in thread_list:
thread.setDaemon(True)
thread.start()
time.sleep(10)
可以看到列印的資訊,每次都不一樣
------1-------
------2-------
------3-------
------4-------
RoundRobinPickUp, 拿到第 1 個pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 01 個pool
個pool
RoundRobinPickUp, 拿到第 0 個poolRoundRobinPickUp, 拿到第
1 個pool
RoundRobinPickUp, 拿到第 1RoundRobinPickUp, 拿到第 個pool
0 個pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第0 個pool
1 個pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 10 個pool
RoundRobinPickUp, 拿到第個pool
1RoundRobinPickUp, 拿到第 個pool
RoundRobinPickUp, 拿到第 00 個pool個pool
RoundRobinPickUp, 拿到第 1RoundRobinPickUp, 拿到第 個pool
1 個poolRoundRobinPickUp, 拿到第
0RoundRobinPickUp, 拿到第 0 個pool
RoundRobinPickUp, 拿到第 1 個pool
個pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 01 個pool
個pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 10 個pool個pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 1 個pool
0 RoundRobinPickUp, 拿到第 個pool
RoundRobinPickUp, 拿到第 0 個pool1
RoundRobinPickUp, 拿到第個pool
1 個poolRoundRobinPickUp, 拿到第
RoundRobinPickUp, 拿到第 0 0個pool
RoundRobinPickUp, 拿到第 個pool1
個poolRoundRobinPickUp, 拿到第
RoundRobinPickUp, 拿到第 10 個pool 個pool
RoundRobinPickUp, 拿到第 0RoundRobinPickUp, 拿到第 個pool
1 個poolRoundRobinPickUp, 拿到第 1 個pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 0 個pool
0 個pool
RoundRobinPickUp, 拿到第RoundRobinPickUp, 拿到第 1 個pool
1 個poolRoundRobinPickUp, 拿到第
0 個pool
RoundRobinPickUp, 拿到第RoundRobinPickUp, 拿到第 0 個pool
RoundRobinPickUp, 拿到第 11 個pool個pool