zookeeper默认选举算法为FastLeaderElection.java。其主要方法为FastLeaderElection.lookForLeader,该接口是一个同步接口,直到选举结束才会返回。选举的结果保存在类Vote中
选举整体过程主要流程可概括为下图:
来看源码实现
1.//首先logicalclock自增, 在这里logicalclock表示本次选举的id,逻辑时钟的值,这个值从0开始递增,每次选举对应一个值,如果在同一次选举中,这个值是一样的,逻辑时钟值越大,说明该节点上的这一次选举leader的进程更加新
- synchronized(this){
- logicalclock++;
- //如果自己不是OBSERVER,则投给自己
- updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
- }
- ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader, proposedZxid,logicalclock,QuorumPeer.ServerState.LOOKING,sid,proposedEpoch);
- 消息格式:
- mType type 消息类型
- long leader 推荐的leader的id,就是配置文件中写好的每个服务器的id
- long zxid 推荐的leader的zxid,zookeeper中的每份数据,都有一个对应的zxid值,越新的数据,zxid值就越大
- long epoch, logicalclock
- ServerState state, 本节点的状态
- long sid 本节点的 id,即myid
3.当该节点的状态为LOOKING且没有stop时,就一直loop到选出leader为止
- //从消息队列中接收消息
- Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
- //如没有接收到消息,则检查manager.haveDelivered(),如果已经全部发送出去了,就继续发送,一直到选出leader为止。否则就重新连接。
- if(manager.haveDelivered()){
- sendNotifications();
- } else {
- manager.connectAll();
- }
- int tmpTimeOut = notTimeout*2;//延长超时时间
- notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
- case LOOKING:
- // If notification > current, replace and send messages out
- if (n.electionEpoch > logicalclock) { //该节点的epoch大于 logicalclock,表示当前新一轮的选举
- logicalclock = n.electionEpoch;//更新本地的logicalclock
- recvset.clear();//清空接收队列recvset
- //调用totalOrderPredicate决定是否更新自己的投票,依次比较选举轮数epoch,事务zxid,服务器编号server id(myid)
- if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
- updateProposal(n.leader, n.zxid, n.peerEpoch);//把投票修改为对方的
- } else {
- updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch());
- }
- sendNotifications();//广播消息
- } else if (n.electionEpoch < logicalclock) { //如果该节点的epoch小于logicalclock,则忽略
- break;
- } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {
- updateProposal(n.leader, n.zxid, n.peerEpoch);
- sendNotifications();
- }
- recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//把从该节点的信息发到recvset中,表明已经收到该节点的回应
- //通过termPredicate函数判断recvset是否已经达到法定quorum,默认超过半数就通过
- if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) {
- // Verify if there is any change in the proposed leader
- while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ //循环,一直等新的notification到达,直到超时
- if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){
- recvqueue.put(n);
- break;
- }
- }
- if (n == null) { //确定leader
- self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());
- Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
- leaveInstance(endVote);//清空接收队列
- return endVote;
- }
- }
- /*
- *
- * 返回true说明需要更新数据
- * We return true if one of the following three cases hold:
- * 1- New epoch is higher
- * 2- New epoch is the same as current epoch, but new zxid is higher
- * 3- New epoch is the same as current epoch, new zxid is the same
- * as current zxid, but server id is higher.
- */
- protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
- ...
- return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
- }
2.Zookeeper的客户端和服务会检查确保每个znode上的数据小于1M,因为Zookeeper为了提供高吞吐量,保存到内存里的数据量不宜过多
转载请注明来源:http://blog.csdn.net/odailidong/article/details/41855613