python codis叢集客戶端(二) - 基於zookeeper對例項建立與摘除
阿新 • • 發佈:2022-05-02
在這一篇中我們實現了不通過zk來編寫codis叢集proxys的api,
如果codis叢集暴露zk給你的話,那麼就方便了,探活和故障摘除與恢復codis叢集都給你搞定了,你只需要監聽zookeeper中例項的狀態就好了。
下面看我的實現。
1、CodisByZKPool.py
這裡通過zk讀取並初始化pool_shards,簡單說一下如何故障摘除和恢復
1)我們監聽zk中節點狀態改變,當發現某個例項對應的節點狀態變化了,比如DELETE了,那麼我們認為這個例項掛了,我們就會重新_create_pool重新整理shards列表,摘除故障例項。
2)同樣,當我們發現節點CREATE,就是新增了例項,或者例項從崩潰中恢復了,我們也會重新_create_pool重新整理shards列表,新增例項。
# -*- coding:utf-8 -*- import redis import logging from kazoo.client import KazooClient from Podis import Podis from PickUp import RandomPickUp, PickUp logger = logging.getLogger(__name__) class CodisByZKPool(object): def __init__(self, zk_config): self._pool_shards = [] self.zk_config = zk_config self.zk = self._init_zk() def _init_zk(self): return KazooClient(hosts=self.zk_config.get('hosts'), timeout=self.zk_config.get('timeout')) def _create_pool(self): try: if not self.zk.connected: self.zk.start() address_list = self.zk.get_children(self.zk_config.get('path'), watch=self._watch_codis_instances) for address in address_list: host = address.split(':')[0] port = address.split(':')[1] self._pool_shards.append( Podis( redis.ConnectionPool( host=host, port=port, db=0, password=None, max_connections=None ) ) ) if len(self._pool_shards) == 0: raise Exception('create pool failure!') except Exception, ex: raise finally: self.zk.stop() def _watch_codis_instances(self, event): if event.type == "CREATED" and event.state == "CONNECTED": self._create_pool() elif event.type == "DELETED" and event.state == "CONNECTED": self._create_pool() elif event.type == "CHANGED" and event.state == "CONNECTED": self._create_pool() elif event.type == "CHILD" and event.state == "CONNECTED": self._create_pool() else: logger.error('failure: not cover this event - %s'.format(event.type)) def get_connection(self, pick_up=None): if isinstance(pick_up, PickUp): codisPool = pick_up.pick_up(self._pool_shards) else: pick_up = RandomPickUp() codisPool = pick_up.pick_up(self._pool_shards) return codisPool def get_availables(self): return self._pool_shards
2、負載均衡PickUp.py
跟上一篇一樣,這裡就不多說了。
# -*- coding:utf-8 -*- import abc import uuid import threading class PickUp(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod def __init__(self): pass @abc.abstractmethod def pick_up(self, pool_list): return class RandomPickUp(PickUp): def __init__(self): PickUp.__init__(self) def pick_up(self, pool_list): pool_size = len(pool_list) index = abs(hash(uuid.uuid4())) % pool_size pool = pool_list[index] print "RandomPickUp, 拿到第", index, "個pool" return pool class RoundRobinPickUp(PickUp): def __init__(self): PickUp.__init__(self) self.index = 0 self.round_robin_lock = threading.Lock() def pick_up(self, pool_list): with self.round_robin_lock: pool_size = len(pool_list) self.index += 1 index = abs(self.index) % pool_size pool = pool_list[index] print "RoundRobinPickUp, 拿到第", index, "個pool" return pool
3、配置檔案
這裡就只用zk_config就可以了,我們認為在zk中已經有所有的codisproxy例項的address了。
codis_config = {
'addrs': '100.90.186.47:3000,100.90.187.33:3000'
}
zk_config = {
'hosts': '10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181',
'timeout': 10,
'path': '/codis/instances'
}
4、連結類Podis.py
# -*- coding:utf-8 -*-
import redis
import logging
import traceback
logger = logging.getLogger(__name__)
def redis_getter(func):
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return result or None
except Exception, ex:
logger.error(traceback.format_exc())
raise
return wrapper
def redis_setter(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
return True
except Exception, ex:
logger.error(traceback.format_exc())
raise
return wrapper
class Podis(object):
def __init__(self, pool):
self._connection = redis.StrictRedis(connection_pool=pool)
@redis_getter
def ping(self):
return self._connection.ping()
@redis_getter
def get(self, key):
return self._connection.get(key)
@redis_setter
def set(self, key, value):
self._connection.set(key, value)
@redis_setter
def lpush(self, key, *value):
self._connection.lpush(key, *value)
@redis_getter
def lpop(self, key):
return self._connection.lpop(key)
@redis_getter
def lrange(self, key, start, end):
return self._connection.lrange(key, start, end)
@redis_setter
def sadd(self, key, *value):
self._connection.sadd(key, *value)
@redis_setter
def srem(self, key, *value):
self._connection.srem(key, *value)
@redis_getter
def zrange(self,key,start,end):
return self._connection.zrange(key,start,end)
@redis_getter
def zrevrange(self,key,start,end):
return self._connection.zrevrange(key,start,end,withscores=True)
@redis_getter
def zscore(self,key,*value):
return self._connection.zscore(key,value)
@redis_setter
def zadd(self,key,score,*value):
self._connection.zadd(key,score,value)
@redis_getter
def smembers(self, key):
return self._connection.smembers(key)
@redis_getter
def hgetall(self, key):
return self._connection.hgetall(key)
@redis_getter
def hget(self, key, name):
return self._connection.hget(key, name)
@redis_getter
def hkeys(self, key):
return self._connection.hkeys(key)
@redis_setter
def hset(self, key, name, value):
self._connection.hset(key, name, value)
@redis_setter
def hmset(self, name, mapping):
self._connection.hmset(name, mapping)
@redis_setter
def hdel(self, key, name):
self._connection.hdel(key, name)
@redis_setter
def delete(self, *key):
self._connection.delete(*key)
# codis不支援
@redis_getter
def keys(self, pattern):
return self._connection.keys(pattern)
@redis_setter
def expire(self, key, time):
return self._connection.expire(key, time)
@redis_getter
def ttl(self, key):
return self._connection.ttl(key)
5、例子
import sys
sys.path.append('../')
import time
import threading
from pycodis.CodisConfig import zk_config
from pycodis.CodisByZKPool import CodisByZKPool
from pycodis.PickUp import RoundRobinPickUp
codis_pool1 = CodisByZKPool(zk_config)
print '------1-------'
pick_up1 = RoundRobinPickUp()
print '------2-------'
codis_pool2 = CodisByZKPool(zk_config)
print '------3-------'
pick_up2 = RoundRobinPickUp()
print '------4-------'
def func(i):
for i in range(10):
podis1 = codis_pool1.get_connection(pick_up=pick_up1)
podis2 = codis_pool2.get_connection(pick_up=pick_up2)
podis1.delete(i)
podis2.delete(i)
time.sleep(1)
thread_list = []
for i in range(100):
thread_list.append(threading.Thread(target=func, args=[i]))
for thread in thread_list:
thread.setDaemon(True)
thread.start()
time.sleep(10)