Zookeeper (五) 源码剖析: Zab 协议
前言
这应该是 ZK 系列的最后一篇博客了。主要是分析在分布式环境下,不同节点的数据的同步问题。在阅读本文前,先假设群首选举已经完成。
概念
1. Zab 共识协议
当接收一个写请求时,follower 会将请求转发给群首,群首将会执行写请求,并将执行结果以事务的方式广播给其他的节点。如何确认一个事务已经提交,这就引入了协议: Zab (Zookeeper Atoimc Broadcast protocol)。
这个协议比较简单,类似于一个两阶段提交 2PC
的事务实现。
- Leader 向所有 Follower 发送一个
提案
消息 p; - Follower 收到 p 消息后,向 Leader 发送一个 ACK 消息,表示已经收到
提案
; - 当收到
仲裁数量
(过半的节点数)的 ACK 消息后,群首向 Follower 发送 COMMIT 操作; - 节点收到 COMMIT 操作后,对
提案
执行事务的提交操作。一次事务完成。
下面部分,将会一步步把协议的实现在代码中寻找出来。
2 代码调试
2.1 Follower 将写请求转发给群首?
在 Follower 服务器从网络层获得的 Request 请求,会先通过 FollowerRequestProcessor ,然后根据 Request 的性质,把包转发给 Leader。
// FollowerRequestProcessor.run()...
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getFollower().request(request); // 以上枚举的请求都会被转发到服务器中
break;
case OpCode.createSession:
case OpCode.closeSession:
// Don't forward local sessions to the leader.
if (!request.isLocalSession()) {
zks.getFollower().request(request);
}
break;
}
...
2.2 Leader 收到 Request 后,如何发起提案 Proposal ?
如图,Request 会从 PrepRequestProcessor 转发到 ProposalRequestProcessor 。在 ProposalRequestProcessor 中,假如是 事务
型的请求,将会发起一次提案(如下代码),并且会尝试传递给 SyncRequest
Processor 进行持久化。在 提案 流程上,群首自己也应该返回 ACK 信息,AckRequestProcessor 就是负责向自己返回 ACK 信息。
// ProposalRequestProcessor.java /**
* 这是一个提案专用的处理器
*
* @param request
* @throws RequestProcessorException
*/
publicvoidprocessRequest(Request request)throws RequestProcessorException {
if (request instanceof LearnerSyncRequest) {
zks.getLeader().processSync((LearnerSyncRequest) request);
} else {
nextProcessor.processRequest(request);
if (request.getHdr() != null) {
// 这里假如是一个 事务 类型的 Transactions,就需要执行集群同步。
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request); // 发起 提案,从这里切入
} catch (XidRolloverException e) {
thrownew RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request); // 将 Request 持久化起来
}
}
}
// Leader.java
/**
* create a proposal and send it out to all the members
*
* 根据一个 Request 创建一个提案
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request)throws XidRolloverException {
if (request.isThrottled()) {
LOG.error("Throttled request send as proposal: {}. Exiting.", request);
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg = "zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
thrownew XidRolloverException(msg);
}
// 构建一个 QuorumPacket 仲裁包
byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this) {
p.addQuorumVerifier(self.getQuorumVerifier());
if (request.getHdr().getType() == OpCode.reconfig) {
self.setLastSeenQuorumVerifier(request.qv, true);
}
if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion()) {
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
// 发起一次提案
LOG.debug("发起提案:Proposing:: {}", request);
lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
// 发送给所有 Follower
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
return p;
}
2.3 Follower 如何接收和处理提案信息?
从协议可以知道,Follower 接收到 提案 信息后,会向 Leader 返回一个 ACK 信息。
具体实现会是这样的,Leader 集群的通信层,向 Follower 发送一个 Request ,Request 会通过 queuedRequests 队列被 SyncRequestProcessor 进行消费。
SyncRequestProcessor 消费的时候,会先进行数据同步,然后把 Request 传递到 SendAckRequestProcessor,向 Leader 返回 ACK 信息。
SyncRequestProcessor 因为是通用的,这里不展开代码了,具体的工作就是持久化事务到磁盘中
。这就简单地贴一下 ACK 的代码。
// SendAckRequestProcessor.javapublicvoidprocessRequest(Request si){
if (si.type != OpCode.sync) {
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null, null);
try {
si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}
2.4 Leader 如何收集和统计 ACK 信息?
在 Leader 节点中,它和 Follower 通信的类是 LearnerHandler。所以,Leader 收到的 ACK 消息包,可以先通过 LearnerHandler 进行 DEBUG。
LearnerHandler 是一个 Runnable,有一个很长的 run() 方法,如果知识 DEBUG ACK 消息包,可以直接看 ACK 的处理部分。如下:
// LearnerHandler...
switch (qp.getType()) {
case Leader.ACK:
if (this.learnerType == LearnerType.OBSERVER) {
LOG.debug("Received ACK from Observer {}", this.sid);
}
syncLimitCheck.updateAck(qp.getZxid());
learnerMaster.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress()); // 从这里切入
break;
...
// Leader.java
/**
* Keep a count of acks that are received by the leader for a particular
* proposal
*
* @param zxid, the zxid of the proposal sent out
* @param sid, the id of the server that sent the ack
* @param followerAddr
*/
@Override
publicsynchronizedvoidprocessAck(long sid, long zxid, SocketAddress followerAddr){
if (!allowedToCommit) {
return; // last op committed was a leader change - from now on
}
// the new leader should commit
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
}
if ((zxid & 0xffffffffL) == 0) {
/*
* We no longer process NEWLEADER ack with this method. However,
* the learner sends an ack back to the leader after it gets
* UPTODATE, so we just ignore the message.
*/
return;
}
if (outstandingProposals.size() == 0) {
LOG.debug("outstanding is 0");
return;
}
if (lastCommitted >= zxid) {
// ACK 包过期,直接放弃
LOG.debug(
"proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted),
Long.toHexString(zxid));
// The proposal has already been committed
return;
}
Proposal p = outstandingProposals.get(zxid); // 根据 zxid 获得提案
p.addAck(sid); // 增加一个 ACK
// 尝试提交,假如提交顺利,将会发送 COMMIT 包到所有的 Follower 中
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig) {
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p != null) {
curZxid++;
p = outstandingProposals.get(curZxid);
if (p != null) {
hasCommitted = tryToCommit(p, curZxid, null);
}
}
}
}
2.5 Follower 如何响应 COMMIT 消息包
CommitProcessor 是专门响应 COMMIT 消息包的。其内部维护了几个队列。
- LinkedBlockingQueue queuedRequests; 接受到的 Request 都在这里。
- LinkedBlockingQueue queuedWriteRequests; 正在等待 COMMIT 的 Request,这个队列是 queuedRequest 队列的子集
- LinkedBlockingQueue committedRequests; 已经 COMMIT 的 Request
- Map<Long, Deque< Request >> pendingRequests; sessionId -> Deque< Request > 的映射,这是为了挂起这些请求,等待 COMMIT;
在讨论 CommitProcessor 响应 COMMIT 时,先简单介绍一下这个 Processor 的负责的工作原理。
- 接受到所有的 Request 都会先入队 queuedRequests;
- Request 是事务修改型的,入队 queuedWriteRequests;
- 收到 COMMIT 包时,会将 Request 入队 committedRequests;
- Processor 会消费 committedRequests 的消息包,将 queuedWriteRequests 出队匹配,匹配完成后,把
事务Request
传递到 FinalRequestProcessor,完成事务。
源码篇幅太长,就不贴了,CommitProcessor.java。
后记
终于写完 Zab 协议和实现了。至此,Zookeeper 的核心模块基本分析完毕了。不得不说,Zookeeper 的源码确实还是比较难读的,经常遇到连续四五百行的代码。虽然是一个强悍的工具,我觉得源码方便可以做得更好。
但 Zookeeper 在设计上是非常成功的。以下小结一下:
- 基于 JDK 的 NIO 与客户端的通信设计,向上层提供 Request 通信数据。
- 基于 Processor 责任链实现 Zab 协议,特别是 CommitProcessor 挂起 Request 的操作,非常奇妙;
- 代码中,不少组件都是 Thread 子类,通过生成者-消费者模型进行交流,因此组件可以互相解耦,组合实现不同的角色,如 Leader、Follower等;
Zookeeper 其他文章连接:
- Zookeeper (一) 架设源码调试环境
- ZooKeeper (二) 源码剖析: 群首选举
- ZooKeeper (三) 源码剖析: 服务端网络连接层 NIO
- ZooKeeper (四) 源码剖析:数据模型和存储
参考
- 《ZooKeeper 分布式过程协同技术详解》 Flavio Junqueira Benjamin Reed 著
- 聊聊zookeeper的ZAB算法
以上是 Zookeeper (五) 源码剖析: Zab 协议 的全部内容, 来源链接: utcz.com/a/24660.html