FastLeaderElection.lookForLeader
选举核心方法
public Vote lookForLeader() throws InterruptedException {
try {
// 收票箱(节点状态是LOOKING发来的投票信息)
HashMap<Long, Vote> recvVoteSet = new HashMap<>();
// 收票箱(选举结束节点发来的投票信息)
HashMap<Long, Vote> outOfElection = new HashMap<>();
synchronized (this) {
// 当前节点选举轮次
logicalClock.incrementAndGet();
// 初始化投票参数
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
// 群发Vote
sendNotifications();
// LOOKING & not stop
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
Notification n = recvQueue.poll(200, TimeUnit.MILLISECONDS);
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
manager.connectAll();
}
} else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
// 对方选举状态(LOOKING)
case LOOKING:
// 如果对方选举轮次大于本节点选举轮次,投票箱直接清空,并更新本节点的投票轮次
if (n.electionEpoch > logicalClock.get()) {
logicalClock.set(n.electionEpoch);
recvVoteSet.clear();
// 如果对方发来的选举信息获胜
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
// 更新选举信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications();
// 如果对方选举轮次小于本节点选举轮次
} else if (n.electionEpoch < logicalClock.get()) {
// 直接忽略(不加入投票箱),WorkerReceiver里直接做了处理
break;
// 如果处于同一选举轮次(这种情况比较多)
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
// 如果对方投票信息获胜,更新选举信息,并群发
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
// 投票箱存入接收到的投票信息
recvVoteSet.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 接收到的当前投票信息(统计过半)
if (termPredicate(recvVoteSet, new Vote(proposedLeader, proposedZxid,
logicalClock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while ((n = recvQueue.poll(200, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
recvQueue.put(n);
break;
}
}
// 如果选票过半,且200ms内没新选票来,则选举初步结束
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING : learningState());
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalClock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
break;
case FOLLOWING:
case LEADING:
// 当前节点和对方处于同一选举轮次
if (n.electionEpoch == logicalClock.get()) {
recvVoteSet.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (ooePredicate(recvVoteSet, outOfElection, n)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING : learningState());
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
outOfElection.put(n.sid, new Vote(n.version,
n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch,
n.state));
if (ooePredicate(outOfElection, outOfElection, n)) {
synchronized (this) {
logicalClock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING : learningState());
}
Vote endVote = new Vote(n.leader,
n.zxid,
n.electionEpoch,
n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}
原文:https://www.cnblogs.com/zhwcs/p/13755069.html