python codis集群客户端(二) - 基于zookeeper对实例创建与摘除

python

 在这一篇中我们实现了不通过zk来编写codis集群proxys的api,http://www.cnblogs.com/kangoroo/p/7481567.html

如果codis集群暴露zk给你的话,那么就方便了,探活和故障摘除与恢复codis集群都给你搞定了,你只需要监听zookeeper中实例的状态就好了。

下面看我的实现。

1、CodisByZKPool.py

这里通过zk读取并初始化pool_shards,简单说一下如何故障摘除和恢复

1)我们监听zk中节点状态改变,当发现某个实例对应的节点状态变化了,比如DELETE了,那么我们认为这个实例挂了,我们就会重新_create_pool刷新shards列表,摘除故障实例。

2)同样,当我们发现节点CREATE,就是新增了实例,或者实例从崩溃中恢复了,我们也会重新_create_pool刷新shards列表,新增实例。

# -*- coding:utf-8 -*-

import redis

import logging

from kazoo.client import KazooClient

from Podis import Podis

from PickUp import RandomPickUp, PickUp

logger = logging.getLogger(__name__)

class CodisByZKPool(object):

def __init__(self, zk_config):

self._pool_shards = []

self.zk_config = zk_config

self.zk = self._init_zk()

def _init_zk(self):

return KazooClient(hosts=self.zk_config.get('hosts'), timeout=self.zk_config.get('timeout'))

def _create_pool(self):

try:

if not self.zk.connected:

self.zk.start()

address_list = self.zk.get_children(self.zk_config.get('path'), watch=self._watch_codis_instances)

for address in address_list:

host = address.split(':')[0]

port = address.split(':')[1]

self._pool_shards.append(

Podis(

redis.ConnectionPool(

host=host, port=port, db=0,

password=None,

max_connections=None

)

)

)

if len(self._pool_shards) == 0:

raise Exception('create pool failure!')

except Exception, ex:

raise

finally:

self.zk.stop()

def _watch_codis_instances(self, event):

if event.type == "CREATED" and event.state == "CONNECTED":

self._create_pool()

elif event.type == "DELETED" and event.state == "CONNECTED":

self._create_pool()

elif event.type == "CHANGED" and event.state == "CONNECTED":

self._create_pool()

elif event.type == "CHILD" and event.state == "CONNECTED":

self._create_pool()

else:

logger.error('failure: not cover this event - %s'.format(event.type))

def get_connection(self, pick_up=None):

if isinstance(pick_up, PickUp):

codisPool = pick_up.pick_up(self._pool_shards)

else:

pick_up = RandomPickUp()

codisPool = pick_up.pick_up(self._pool_shards)

return codisPool

def get_availables(self):

return self._pool_shards

2、负载均衡PickUp.py

跟上一篇一样,这里就不多说了。

# -*- coding:utf-8 -*-

import abc

import uuid

import threading

class PickUp(object):

__metaclass__ = abc.ABCMeta

@abc.abstractmethod

def __init__(self):

pass

@abc.abstractmethod

def pick_up(self, pool_list):

return

class RandomPickUp(PickUp):

def __init__(self):

PickUp.__init__(self)

def pick_up(self, pool_list):

pool_size = len(pool_list)

index = abs(hash(uuid.uuid4())) % pool_size

pool = pool_list[index]

print "RandomPickUp, 拿到第", index, "个pool"

return pool

class RoundRobinPickUp(PickUp):

def __init__(self):

PickUp.__init__(self)

self.index = 0

self.round_robin_lock = threading.Lock()

def pick_up(self, pool_list):

with self.round_robin_lock:

pool_size = len(pool_list)

self.index += 1

index = abs(self.index) % pool_size

pool = pool_list[index]

print "RoundRobinPickUp, 拿到第", index, "个pool"

return pool

3、配置文件

这里就只用zk_config就可以了,我们认为在zk中已经有所有的codisproxy实例的address了。

codis_config = {

'addrs': '100.90.186.47:3000,100.90.187.33:3000'

}

zk_config = {

'hosts': '10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181',

'timeout': 10,

'path': '/codis/instances'

}

4、链接类Podis.py

# -*- coding:utf-8 -*-

import redis

import logging

import traceback

logger = logging.getLogger(__name__)

def redis_getter(func):

def wrapper(*args, **kwargs):

try:

result = func(*args, **kwargs)

return result or None

except Exception, ex:

logger.error(traceback.format_exc())

raise

return wrapper

def redis_setter(func):

def wrapper(*args, **kwargs):

try:

func(*args, **kwargs)

return True

except Exception, ex:

logger.error(traceback.format_exc())

raise

return wrapper

class Podis(object):

def __init__(self, pool):

self._connection = redis.StrictRedis(connection_pool=pool)

@redis_getter

def ping(self):

return self._connection.ping()

@redis_getter

def get(self, key):

return self._connection.get(key)

@redis_setter

def set(self, key, value):

self._connection.set(key, value)

@redis_setter

def lpush(self, key, *value):

self._connection.lpush(key, *value)

@redis_getter

def lpop(self, key):

return self._connection.lpop(key)

@redis_getter

def lrange(self, key, start, end):

return self._connection.lrange(key, start, end)

@redis_setter

def sadd(self, key, *value):

self._connection.sadd(key, *value)

@redis_setter

def srem(self, key, *value):

self._connection.srem(key, *value)

@redis_getter

def zrange(self,key,start,end):

return self._connection.zrange(key,start,end)

@redis_getter

def zrevrange(self,key,start,end):

return self._connection.zrevrange(key,start,end,withscores=True)

@redis_getter

def zscore(self,key,*value):

return self._connection.zscore(key,value)

@redis_setter

def zadd(self,key,score,*value):

self._connection.zadd(key,score,value)

@redis_getter

def smembers(self, key):

return self._connection.smembers(key)

@redis_getter

def hgetall(self, key):

return self._connection.hgetall(key)

@redis_getter

def hget(self, key, name):

return self._connection.hget(key, name)

@redis_getter

def hkeys(self, key):

return self._connection.hkeys(key)

@redis_setter

def hset(self, key, name, value):

self._connection.hset(key, name, value)

@redis_setter

def hmset(self, name, mapping):

self._connection.hmset(name, mapping)

@redis_setter

def hdel(self, key, name):

self._connection.hdel(key, name)

@redis_setter

def delete(self, *key):

self._connection.delete(*key)

# codis不支持

@redis_getter

def keys(self, pattern):

return self._connection.keys(pattern)

@redis_setter

def expire(self, key, time):

return self._connection.expire(key, time)

@redis_getter

def ttl(self, key):

return self._connection.ttl(key)

5、例子

import sys

sys.path.append('../')

import time

import threading

from pycodis.CodisConfig import zk_config

from pycodis.CodisByZKPool import CodisByZKPool

from pycodis.PickUp import RoundRobinPickUp

codis_pool1 = CodisByZKPool(zk_config)

print '------1-------'

pick_up1 = RoundRobinPickUp()

print '------2-------'

codis_pool2 = CodisByZKPool(zk_config)

print '------3-------'

pick_up2 = RoundRobinPickUp()

print '------4-------'

def func(i):

for i in range(10):

podis1 = codis_pool1.get_connection(pick_up=pick_up1)

podis2 = codis_pool2.get_connection(pick_up=pick_up2)

podis1.delete(i)

podis2.delete(i)

time.sleep(1)

thread_list = []

for i in range(100):

thread_list.append(threading.Thread(target=func, args=[i]))

for thread in thread_list:

thread.setDaemon(True)

thread.start()

time.sleep(10)

以上是 python codis集群客户端(二) - 基于zookeeper对实例创建与摘除 的全部内容, 来源链接: utcz.com/z/389483.html

回到顶部