聊聊artemis的QuorumVote

编程

QuorumVote

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVote.java

public abstract class QuorumVote<V extends Vote, T> {

private SimpleString name;

public QuorumVote(SimpleString name) {

this.name = name;

}

/**

* called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when one of the nodes in the quorum is

* successfully connected to. The QuorumVote can then decide whether or not a decision can be made with just that information.

*

* @return the vote to use

*/

public abstract Vote connected();

/**

* called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} fails to connect to a node in the quorum.

* The QuorumVote can then decide whether or not a decision can be made with just that information however the node

* cannot cannot be asked.

*

* @return the vote to use

*/

public abstract Vote notConnected();

/**

* called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when a vote can be made, either from the

* cluster or decided by itself.

*

* @param vote the vote to make.

*/

public abstract void vote(V vote);

/**

* get the decion of the vote

*

* @return the voting decision

*/

public abstract T getDecision();

/**

* called by the {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumManager} when all the votes have been cast and received.

*

* @param voteTopology the topology of where the votes were sent.

*/

public abstract void allVotesCast(Topology voteTopology);

/**

* the name of this quorum vote, used for identifying the correct {@link org.apache.activemq.artemis.core.server.cluster.qourum.QuorumVoteHandler}

*

* @return the name of the wuorum vote

*/

public SimpleString getName() {

return name;

}

}

  • QuorumVote是个抽象类,定义了connected、notConnected、vote、getDecision、allVotesCast抽象方法

QuorumVoteServerConnect

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteServerConnect.java

public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote, Boolean> {

public static final SimpleString LIVE_FAILOVER_VOTE = new SimpleString("LiveFailoverQuorumVote");

private final CountDownLatch latch;

private final String targetNodeId;

private final String liveConnector;

private int votesNeeded;

private int total = 0;

private boolean decision = false;

// Is this the live requesting to stay live, or a backup requesting to become live.

private boolean requestToStayLive = false;

/**

* live nodes | remaining nodes | majority | votes needed

* 1 | 0 | 0 | 0

* 2 | 1 | 1 | 1

* n | r = n-1 | n/2 + 1 | n/2 + 1 rounded

* 3 | 2 | 2.5 | 2

* 4 | 3 | 3 | 3

* 5 | 4 | 3.5 | 3

* 6 | 5 | 4 | 4

*/

public QuorumVoteServerConnect(int size, String targetNodeId, boolean requestToStayLive, String liveConnector) {

super(LIVE_FAILOVER_VOTE);

this.targetNodeId = targetNodeId;

this.liveConnector = liveConnector;

double majority;

if (size <= 2) {

majority = ((double) size) / 2;

} else {

//even

majority = ((double) size) / 2 + 1;

}

//votes needed could be say 2.5 so we add 1 in this case

votesNeeded = (int) majority;

latch = new CountDownLatch(votesNeeded);

if (votesNeeded == 0) {

decision = true;

}

this.requestToStayLive = requestToStayLive;

}

public QuorumVoteServerConnect(int size, String targetNodeId) {

this(size, targetNodeId, false, null);

}

/**

* if we can connect to a node

*

* @return

*/

@Override

public Vote connected() {

return new ServerConnectVote(targetNodeId, requestToStayLive, null);

}

/**

* if we cant connect to the node

*

* @return

*/

@Override

public Vote notConnected() {

return new BooleanVote(false);

}

/**

* live nodes | remaining nodes | majority | votes needed

* 1 | 0 | 0 | 0

* 2 | 1 | 1 | 1

* n | r = n-1 | n/2 + 1 | n/2 + 1 rounded

* 3 | 2 | 2.5 | 2

* 4 | 3 | 3 | 3

* 5 | 4 | 3.5 | 3

* 6 | 5 | 4 | 4

*

* @param vote the vote to make.

*/

@Override

public synchronized void vote(ServerConnectVote vote) {

if (decision)

return;

if (!requestToStayLive && vote.getVote()) {

total++;

latch.countDown();

if (total >= votesNeeded) {

decision = true;

}//do the opposite, if it says there is a node connected it means the backup has come live

} else if (requestToStayLive && vote.getVote()) {

total++;

latch.countDown();

if (liveConnector != null && !liveConnector.equals(vote.getTransportConfiguration())) {

ActiveMQServerLogger.LOGGER.qourumBackupIsLive(liveConnector);

return;

}

if (total >= votesNeeded) {

decision = true;

}

}

}

@Override

public void allVotesCast(Topology voteTopology) {

while (latch.getCount() > 0) {

latch.countDown();

}

}

@Override

public Boolean getDecision() {

return decision;

}

public void await(int latchTimeout, TimeUnit unit) throws InterruptedException {

ActiveMQServerLogger.LOGGER.waitingForQuorumVoteResults(latchTimeout, unit.toString().toLowerCase());

if (latch.await(latchTimeout, unit))

ActiveMQServerLogger.LOGGER.receivedAllQuorumVotes();

else

ActiveMQServerLogger.LOGGER.timeoutWaitingForQuorumVoteResponses();

}

public boolean isRequestToStayLive() {

return requestToStayLive;

}

}

  • QuorumVoteServerConnect继承了QuorumVote,其构造器根据size初始化votesNeeded及decision;其connected方法返回ServerConnectVote;其notConnected方法返回BooleanVote(false);其vote方法对于ServerConnectVote的vote为true的递增total,同时latch.countDown(),对于total大于等于votesNeeded的更新decision为true;其allVotesCast方法则循环latch.countDown()

小结

QuorumVote是个抽象类,定义了connected、notConnected、vote、getDecision、allVotesCast抽象方法;QuorumVoteServerConnect继承了QuorumVote,其构造器根据size初始化votesNeeded及decision;其connected方法返回ServerConnectVote;其notConnected方法返回BooleanVote(false);其vote方法对于ServerConnectVote的vote为true的递增total,同时latch.countDown(),对于total大于等于votesNeeded的更新decision为true;其allVotesCast方法则循环latch.countDown()

doc

  • QuorumVoteServerConnect

以上是 聊聊artemis的QuorumVote 的全部内容, 来源链接: utcz.com/z/513328.html

回到顶部