关于Kazoo
Kazoo是一个ZooKeeper的python客户端,它是一个纯python的应用,并在底层实现了与zookeeper通讯协议。
它主要有这些特性
- 底层通信异步化
- 基于Python装饰器,提供一次watch、终身受用的功能
- 提供 gevent/eventlet 兼容功能
其他的ZooKeeper-Python客户端[1]:
- txzookeeper 基于Twisted/Python(对并发部分进行过更改的Python版本),2013年停止维护
- zc.zk 2014年停止维护,zc.zk2 开始使用 kazoo
关于本文
- 本文基于 kazoo 2.6.1 版本代码进行分析总结。
- 本文中提到的“异步结构”,是根据handler选择变化的,在SequentialThreadingHandler中是异步线程,而在SequentialGeventHandler中是异步协程。
基本使用方式
启动客户端
1
2
3
4zk = KazooClient(hosts="127.0.0.1:2181")
# can be multi hosts, like
# zk = KazooClient(hosts="192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181")
zk.start()CURD 命令
1
2
3
4
5
6
7
8
9
10
11
12# Ensure a path, create if necessary
zk.ensure_path("/path/to/my/node")
# Create a node with data
zk.create("/my/favorite/node", b"a value")
# read api
data, stat = zk.get("/my/favorite")
children = zk.get_children("/my/favorite")
if zk.exists("/my/favorite"):
# Do something
passwatch 监听
- 方式1:采用ZooKeeper提供的默认watch监听事件,可使用的函数:
- get()
- get_children()
- exists()
1
2
3
4
5def my_func(event):
# check to see what the children are now
# Call my_func when the children change
children = zk.get_children("/my/favorite/node", watch=my_func)
- 方式2:采用kazoo提供的Watch装饰器,可使用的监听器有
- ChildrenWatch 观察子节点变化
- DataWatch 观察节点值的变化
1
2
3
def watch_children(children):
print("Children are now: %s" % children)
Retry
在kazoo中,重试机制是通过抽象出一个类来进行控制的,这个类就是KazooRetry。KazooRetry实现了call函数,因此可以被直接当做函数使用。使用方式如下:1
2
3
4
5
6
7
8
9
10result = zk.retry(zk.get, "/path/to/node")
# Custom Retries
kr = KazooRetry(max_tries=3, ignore_expire=False)
result = kr(client.get, "/some/path")
# init client using Retry
rt = KazooRetry(max_tries=3, ignore_expire=False)
zk = KazooClient(hosts="127.0.0.1:2181", connection_retry=rt, command_retry=rt)
zk.start()需要特别指出的是,在KazooRetry中会捕获一种ForceRetryError的异常来触发重试,这个机制被connection用于连接轮询。
Kazoo 状态转换
Source/Target | LOST | CONNECTED | SUSPEND |
---|---|---|---|
LOST | 客户端初始化即为LOST状态 | 成功连接ZK服务端后,转移到CONNECTED | |
CONNECTED | 只有鉴权失败会返回LOST状态 | ZK Server 出现问题,重连ZK节点,先转移到SUSPEND | |
SUSPEND | 重建连接失败,如Session过期 | 重建连接成功,且鉴权有效 |
Kazoo 代码原理浅析
Kazoo底层通信用异步实现,但也提供一些同步命令,这些同步命令是在异步基础上增加等待来模拟同步的。
异步的实现方式可以通过handler类型进行选择。Kazoo提供了3种Handler:SequentialThreadingHandler、SequentialGeventHandler、SequentialEventletHandler,分别用线程、gevent、eventlet的方式来实现异步。
以下我们将kazoo的主要模块拆开,分别讲解原理,然后通过一个kazoo.get()命令的流程来说明kazoo具体是如何工作的。
P.S. 如果觉得代码太多不想细究,只想理解大致原理,可跳过中间的模块解析部分,只看模块架构和get工作流程~
模块架构
Kazoo 主要有这3大模块:client、异步handler、连接connection,其关系如上图所示。
- client:管理配置项,并对外提供get、set等操作命令;
- connection:通过handler提供的异步功能,与ZK进行通信,而通信过程会经过kazoo自己实现的serialization方法进行;
- handler:管理异步队列和异步操作,kazoo客户端初始化,后主要有2个异步队列和1个异步结果操作:
- zk_loop:connection模块通过异步轮询的方式与ZK Server进行通信,处理request和response;
- AsyncResult:异步结果,不同的handler有不同的实现,SequentialGeventHandler中直接调用gevent.event.AsyncResult,在SequentialThreadingHandler中kazoo模拟了一个异步请求,并通过completion_queue队列的线程处理异步结果的callback;
- callback_queue:处理watch回调。
Connection
connection模块中有ConnectionHandler类,用于处理与ZK的异步连接。我们来看看start函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56def start(self):
"""Start the connection up"""
if self.connection_closed.is_set():
rw_sockets = self.handler.create_socket_pair() # 调用本地方法
self._read_sock, self._write_sock = rw_sockets # 获得一个 socket pair
self.connection_closed.clear()
if self._connection_routine: # 验证当前是否存在异步通信线程/协程
raise Exception("Unable to start, connection routine already "
"active.")
self._connection_routine = self.handler.spawn(self.zk_loop)
# 启动一个异步通信线程/协程
# ....
def zk_loop(self):
"""Main Zookeeper handling loop"""
self.logger.log(BLATHER, 'ZK loop started')
self.connection_stopped.clear()
retry = self.retry_sleeper.copy() # 获取client的retry方法
try:
while not self.client._stopped.is_set(): # 注意,轮询不是靠这个while进行的,而是靠下面的retry
# If the connect_loop returns STOP_CONNECTING, stop retrying
if retry(self._connect_loop, retry) is STOP_CONNECTING: # 主要功能函数,连接轮询,处理请求和反馈
break # 默认只会retry 1次
except RetryFailedError:
self.logger.warning("Failed connecting to Zookeeper "
"within the connection retry policy.")
finally:
self.connection_stopped.set()
self.client._session_callback(KeeperState.CLOSED)
self.logger.log(BLATHER, 'Connection stopped')
# ...
def _connect_loop(self, retry):
# Iterate through the hosts a full cycle before starting over
status = None
host_ports = self._expand_client_hosts() # 获取 zk_server list
# Check for an empty hostlist, indicating none resolved
if len(host_ports) == 0:
return STOP_CONNECTING
for host, hostip, port in host_ports: # 对于每个 zk_server,轮询尝试建立连接
if self.client._stopped.is_set():
status = STOP_CONNECTING
break
status = self._connect_attempt(host, hostip, port, retry) # 处理request和response的主体函数
if status is STOP_CONNECTING:
break
if status is STOP_CONNECTING: # 如果当前连接断掉,则返回
return STOP_CONNECTING
else:
raise ForceRetryError('Reconnecting') # 抛出异常,强行触发上层的retry,实现轮询
可以看到,connection调用start()启动之后,进行了3步主要操作:
- 生成一个 socket_pair;
- 验证当前是否已经有异步结构,如果有,则抛出异常并退出;
- 通过client的异步handler.spawn()生成一个异步线程,然后通过zk_loop函数与ZK Server进行通信。
在zk_loop函数中,通过while循环进行轮询,通过_connect_loop处理请求信息。默认retry只会重试1次,重试之前会默认sleep一个随机时间。
在_connect_loop函数中,拿到zk_server的host list,并通过_connect_attempt函数,逐一尝试建立连接。_connect_attempt函数中会进行如下操作:
- 收集当前命令,通过写socket发送请求给ZK Server;
- 如果当前没有命令,发送一个ping过去,保持连接活性;
- 通过_read_socket函数,从读socket接受response;
- 在_read_socket函数中,把返回值写入异步结果AsyncResult,如果返回值中存在watch,则调用相应的callback。
关于connection机制的介绍,可参考引用[3],介绍的很详细。
Handler
Kazoo使用handler来进行异步操作,提供了3种异步操作的handler:SequentialThreadingHandler、SequentialGeventHandler、SequentialEventletHandler,分别用线程、gevent、eventlet的方式来实现异步。
由于默认使用SequentialThreadingHandler提供异步方案,我们以SequentialThreadingHandler来进行分析。
先看看handler提供的函数接口:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53def start(self):
"""Start the worker threads."""
# 代码略
def stop(self):
"""Stop the worker threads and empty all queues."""
# 代码略
def select(self, *args, **kwargs):
# if we have epoll, and select is not expected to work
# use an epoll-based "select". Otherwise don't touch
# anything to minimize changes
# 代码略
def socket(self):
return utils.create_tcp_socket(socket)
def create_connection(self, *args, **kwargs):
return utils.create_tcp_connection(socket, *args, **kwargs)
def create_socket_pair(self):
return utils.create_socket_pair(socket)
def event_object(self):
"""Create an appropriate Event object"""
return threading.Event()
def lock_object(self):
"""Create a lock object"""
return threading.Lock()
def rlock_object(self):
"""Create an appropriate RLock object"""
return threading.RLock()
def async_result(self):
"""Create a :class:`AsyncResult` instance"""
return AsyncResult(self)
def spawn(self, func, *args, **kwargs):
t = threading.Thread(target=func, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return t
def dispatch_callback(self, callback):
"""Dispatch to the callback object
The callback is put on separate queues to run depending on the
type as documented for the :class:`SequentialThreadingHandler`.
"""
self.callback_queue.put(lambda: callback.func(*callback.args))
spawn() 函数
这些函数接口中,最重要的函数是spawn(),它经常在各处被调用,用于提供异步的功能。具体在SequentialThreadingHandler中,spawn()会调用python的threading模块,生成一个线程,并将线程设置为守护线程。start() 函数
kazoo客户端初始化后会调用start()函数,启动异步。在SequentialThreadingHandler中,start()函数会初始化2个队列,并为其各自生成一个线程进行消费:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33def start(self):
"""Start the worker threads."""
with self._state_change:
if self._running:
return
# Spawn our worker threads, we have
# - A callback worker for watch events to be called 用于唤醒callback
# - A completion worker for completion events to be called 用于设置异步结果AsyncResult的值
for queue in (self.completion_queue, self.callback_queue):
w = self._create_thread_worker(queue) # 此处将调用_create_thread_worker生成2个worker,持续消费队列
self._workers.append(w)
self._running = True
python2atexit.register(self.stop)
def _create_thread_worker(self, queue):
def _thread_worker(): # pragma: nocover 消费队列的函数
while True:
try:
func = queue.get()
try:
if func is _STOP:
break
func()
except Exception:
log.exception("Exception in worker queue thread")
finally:
queue.task_done()
del func # release before possible idle
except self.queue_empty:
continue
t = self.spawn(_thread_worker) # 生成消费队列的线程
return tevent_object() 函数
生成一个锁,用在并发安全控制。由于需要考虑在gevent/eventlet并发模型下的锁控制,所以将锁生成抽象到handler来处理。在SequentialThreadingHandler中直接返回threading.Event()。
Watch
Kazoo提供了2种watch的方式,我们分别介绍其原理。
- Default watch
该方式通过将watch的回调函数作为参数,传递给get()、get_children()和exists()接口完成watch的功能。
以get()为例,通过源码,可以看到get()函数的操作流程如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19def get(self, path, watch=None):
"""Get the value of a node.
.. (the doc) ..
"""
return self.get_async(path, watch=watch).get()
def get_async(self, path, watch=None):
"""
.. (the doc) ..
"""
if not isinstance(path, string_types): # 进行参数类型校验
raise TypeError("Invalid type for 'path' (string expected)")
if watch and not callable(watch):
raise TypeError("Invalid type for 'watch' (must be a callable)")
async_result = self.handler.async_result()
self._call(GetData(_prefix_root(self.chroot, path), watch), # GetData 会将get请求序列化成命令,用于与ZK Server直接通信
async_result)
return async_result
实际上,watch的步骤如下:
1. get()会调用get_async(),将参数和回调函数watcher传递过去;
2. 而get_async()函数会将get命令与watcher封装,通过_call()函数放入自身的请求队列;
3. 后面的流程一分为二:一方面,序列化的命令会将watch的标志位置为1,而在处理请求的时候watcher函数会被放到client的watcher中;
4. 当读到的请求发回一个需要唤醒watcher的路径时,会调用client中对应的watcher,执行回调函数。
- Decorator watch
Kazoo提供了2个watch装饰器,使用装饰器可以达到一次watch永远生效的作用。一般情况下ZooKeeper的watch事件只会生效1次,可以想象,这里其实只是一个语法糖:在初始化装饰器时注册一次watch,当触发watch事件后,再注册同样的watch即可。
原理很简单,但是具体的实现还是有学习价值的。以DataWatch为例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38def __init__(self, client, path, func=None, *args, **kwargs):
# ...
# 篇幅有限,此处省略了很多文档和代码
# ...
if func is not None:
self._used = True
self._client.add_listener(self._session_watcher) # 注册session listener,当状态改变时触发,下同
self._get_data() # 获取数据
def __call__(self, func):
# ...
# 篇幅有限,此处省略了很多文档和代码
# ...
self._client.add_listener(self._session_watcher)
self._get_data()
return func
def _get_data(self, event=None):
# ...
# 篇幅有限,此处省略了很多文档和代码
# ...
try:
data, stat = self._retry(self._client.get,
self._path, self._watcher) # 通过retry,调用get命令,并将watcher注册
except NoNodeError:
data = None
# This will set 'stat' to None if the node does not yet
# exist.
stat = self._retry(self._client.exists, self._path, # 如果当前node不存在,会先尝试watch节点
self._watcher) # 然后生成一个异步结构来持续监听节点data变化
if stat:
self._client.handler.spawn(self._get_data)
return
# ...
# 码多不看
# ...
通信流程
至此,kazoo的主要模块介绍完毕。下面通过一个get()命令的执行顺序,将kazoo工作机制展现出来。不多说,上图:
Annotations & References:
[1] ZKClient Bindings
[2] Kazoo Official
[3] zookeeper client API实现(python kazoo 的实现)