ZooKeeper事务流程源码分析<1>

= 1456

本文出自:【InTheWorld的博客】

ZooKeeper

利用过年这几天的一些空余时间,把《从PAXOS到ZOOKEEPER分布式一致性原理与实践》看了一遍。对ZooKeeper有了一个大致的了解。ZooKeeper作为Chubby的开源实现,在当前的分布式生产环境中有着广泛的应用。使用ZooKeeper可以方便的实现Dynamic DNS、微服务架构中的服务路由以及为其他分布式系统提供支撑(如Kafka, HBase)。

作为个人关于ZooKeeper的第一篇blog,我准备分析一下ZooKeeper的事务流程。事务处理是ZooKeeper的一大重要功能。在ZooKeeper事务流程实现中,所有的事务请求都需要从Leader Server协调完成。因此,从ZooKeeper的Leader Sever来分析事务流程更为简单。

首先分析LeaderZooKeeperServer类,它是ZooKeeperServer的子类,一个该类型的实例其实就对应一个leader服务器。这个类实现了一个方法,其中包含了重要信息。

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                finalProcessor, getLeader().toBeApplied);
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false);
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

这段代码其实内容很简单,却非常重要。这里采用了类似于“责任链”的设计模式,该函数的作用就是把处理请求的逻辑链建立起来。“责任链”的确是建立起来了,那么这条链是何时被触发的呢?LeaderZooKeeperServer是QuorumZooKeeperServer的直接子类。该类有一个QuorumPeer类型的成员self,而self有有一个Leader类型的成员leader。这个leader类型的变量完成了网络交互方面的工作,也是触发前文中“责任链”的地方。对于Leader的实现,我们有必要再看一段代码,首先是Leader的构造函数。

    Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
        this.self = self;
        try {
            if (self.getQuorumListenOnAllIPs()) {
                ss = new ServerSocket(self.getQuorumAddress().getPort());
            } else {
                ss = new ServerSocket();
            }
            ss.setReuseAddress(true);
            if (!self.getQuorumListenOnAllIPs()) {
                ss.bind(self.getQuorumAddress());
            }
        } catch (BindException e) {
            if (self.getQuorumListenOnAllIPs()) {
                LOG.error("Couldn't bind to port " + self.getQuorumAddress().getPort(), e);
            } else {
                LOG.error("Couldn't bind to " + self.getQuorumAddress(), e);
            }
            throw e;
        }
        this.zk=zk;
    }

从这里就看出来了,Leader其实就是一个服务器,通过服务器套接字接受客户端和Follower的连接。涉及到套接字通信,其实重点关注收发的逻辑就好。对于接收和发送功能,Leader主要通过LearnerHandler来实现。首先来看接收功能吧!简略地贴点代码

            while (true) {
                qp = new QuorumPacket();
                ia.readRecord(qp, "packet");
        /**略**/
        switch (qp.getType()) {
        /**略**/
        case Leader.REQUEST:                    
                    bb = ByteBuffer.wrap(qp.getData());
                    sessionId = bb.getLong();
                    cxid = bb.getInt();
                    type = bb.getInt();
                    bb = bb.slice();
                    Request si;
                    if(type == OpCode.sync){
                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                    } else {
                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                    }
                    si.setOwner(this);
                    leader.zk.submitRequest(si);
                    break;
        }
        /**略**/
        }

LearnerHandler其实是Thread的子类,而上面的这段代码其实是它的run方法。这段代码的死循环就是请求处理的核心代码。如果QuorumPacket的类型是Request,那么将对报文进行简单的处理,然后提交给leader成员所指向的LeaderZooKeeperServer。而这个submitRequest()函数才是真正地实现了“责任链”的触发。而这个提交请求的函数做的事情也很简单,就是把request放到一个请求队列中。

Leader在ProposalRequestProcessor中会根据事务性请求生成对应的Proposal,同时把这个提议广播出去。在更进一步之前,是需要先等待followers的应答的。对于ack的应答的处理逻辑也在LearnerHandler中,而LearnerHandler也只是简单的把ack包提交给Leader的processAck()函数。processAck()函数的代码片段如下:

synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {   
        if (outstandingProposals.size() == 0) {
            return;
        }
        Proposal p = outstandingProposals.get(zxid);
        p.ackSet.add(sid);
        if (self.getQuorumVerifier().containsQuorum(p.ackSet)){         
            outstandingProposals.remove(zxid);
            if (p.request != null) {
                toBeApplied.add(p);
            }
            commit(zxid);
            inform(p);
            zk.commitProcessor.commit(p.request);
            if(pendingSyncs.containsKey(zxid)){
                for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                    sendSync(r);
                }
            }
        }
    }

这个函数的重要代码都被我筛选出来了,看起来并不复杂。从outstandingProposal中找到对应的proposal,然后把follow服务器的sid添加到这个proposal的ackSet中。然后判断是否已经有过半的服务器通过了该proposal,如果条件为真,那么就把这个proposal提交给commitProcessor。

而后的故事就在下一篇博客中再分析了。

发表评论