利用过年这几天的一些空余时间,把《从PAXOS到ZOOKEEPER分布式一致性原理与实践》看了一遍。对ZooKeeper有了一个大致的了解。ZooKeeper作为Chubby的开源实现,在当前的分布式生产环境中有着广泛的应用。使用ZooKeeper可以方便的实现Dynamic DNS、微服务架构中的服务路由以及为其他分布式系统提供支撑(如Kafka, HBase)。
作为个人关于ZooKeeper的第一篇blog,我准备分析一下ZooKeeper的事务流程。事务处理是ZooKeeper的一大重要功能。在ZooKeeper事务流程实现中,所有的事务请求都需要从Leader Server协调完成。因此,从ZooKeeper的Leader Sever来分析事务流程更为简单。
首先分析LeaderZooKeeperServer类,它是ZooKeeperServer的子类,一个该类型的实例其实就对应一个leader服务器。这个类实现了一个方法,其中包含了重要信息。
protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor( finalProcessor, getLeader().toBeApplied); commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false); commitProcessor.start(); ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this, commitProcessor); proposalProcessor.initialize(); firstProcessor = new PrepRequestProcessor(this, proposalProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
这段代码其实内容很简单,却非常重要。这里采用了类似于“责任链”的设计模式,该函数的作用就是把处理请求的逻辑链建立起来。“责任链”的确是建立起来了,那么这条链是何时被触发的呢?LeaderZooKeeperServer是QuorumZooKeeperServer的直接子类。该类有一个QuorumPeer类型的成员self,而self有有一个Leader类型的成员leader。这个leader类型的变量完成了网络交互方面的工作,也是触发前文中“责任链”的地方。对于Leader的实现,我们有必要再看一段代码,首先是Leader的构造函数。
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException { this.self = self; try { if (self.getQuorumListenOnAllIPs()) { ss = new ServerSocket(self.getQuorumAddress().getPort()); } else { ss = new ServerSocket(); } ss.setReuseAddress(true); if (!self.getQuorumListenOnAllIPs()) { ss.bind(self.getQuorumAddress()); } } catch (BindException e) { if (self.getQuorumListenOnAllIPs()) { LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e); } else { LOG.error("Couldn't bind to " + self.getQuorumAddress(), e); } throw e; } this.zk=zk; }
从这里就看出来了,Leader其实就是一个服务器,通过服务器套接字接受客户端和Follower的连接。涉及到套接字通信,其实重点关注收发的逻辑就好。对于接收和发送功能,Leader主要通过LearnerHandler来实现。首先来看接收功能吧!简略地贴点代码
while (true) { qp = new QuorumPacket(); ia.readRecord(qp, "packet"); /**略**/ switch (qp.getType()) { /**略**/ case Leader.REQUEST: bb = ByteBuffer.wrap(qp.getData()); sessionId = bb.getLong(); cxid = bb.getInt(); type = bb.getInt(); bb = bb.slice(); Request si; if(type == OpCode.sync){ si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); } else { si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); } si.setOwner(this); leader.zk.submitRequest(si); break; } /**略**/ }
LearnerHandler其实是Thread的子类,而上面的这段代码其实是它的run方法。这段代码的死循环就是请求处理的核心代码。如果QuorumPacket的类型是Request,那么将对报文进行简单的处理,然后提交给leader成员所指向的LeaderZooKeeperServer。而这个submitRequest()函数才是真正地实现了“责任链”的触发。而这个提交请求的函数做的事情也很简单,就是把request放到一个请求队列中。
Leader在ProposalRequestProcessor中会根据事务性请求生成对应的Proposal,同时把这个提议广播出去。在更进一步之前,是需要先等待followers的应答的。对于ack的应答的处理逻辑也在LearnerHandler中,而LearnerHandler也只是简单的把ack包提交给Leader的processAck()函数。processAck()函数的代码片段如下:
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { if (outstandingProposals.size() == 0) { return; } Proposal p = outstandingProposals.get(zxid); p.ackSet.add(sid); if (self.getQuorumVerifier().containsQuorum(p.ackSet)){ outstandingProposals.remove(zxid); if (p.request != null) { toBeApplied.add(p); } commit(zxid); inform(p); zk.commitProcessor.commit(p.request); if(pendingSyncs.containsKey(zxid)){ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) { sendSync(r); } } } }
这个函数的重要代码都被我筛选出来了,看起来并不复杂。从outstandingProposal中找到对应的proposal,然后把follow服务器的sid添加到这个proposal的ackSet中。然后判断是否已经有过半的服务器通过了该proposal,如果条件为真,那么就把这个proposal提交给commitProcessor。
而后的故事就在下一篇博客中再分析了。
发表评论