聊聊artemis的SharedNothingBackupQuorum

编程

SharedNothingBackupQuorum

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java

public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener {

private TransportConfiguration liveTransportConfiguration;

public enum BACKUP_ACTIVATION {

FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP;

}

private QuorumManager quorumManager;

private String targetServerID = "";

private final NodeManager nodeManager;

private final StorageManager storageManager;

private final ScheduledExecutorService scheduledPool;

private final int quorumSize;

private final int voteRetries;

private final long voteRetryWait;

private final Object voteGuard = new Object();

private CountDownLatch latch;

private ClientSessionFactoryInternal sessionFactory;

private CoreRemotingConnection connection;

private final NetworkHealthCheck networkHealthCheck;

private volatile boolean stopped = false;

private final int quorumVoteWait;

//......

@Override

public void nodeDown(Topology topology, long eventUID, String nodeID) {

if (targetServerID.equals(nodeID)) {

decideOnAction(topology);

}

}

@Override

public void nodeUp(Topology topology) {

//noop

}

/**

* if the connection to our replicated live goes down then decide on an action

*/

@Override

public void connectionFailed(ActiveMQException exception, boolean failedOver) {

decideOnAction(sessionFactory.getServerLocator().getTopology());

}

@Override

public void connectionFailed(final ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {

connectionFailed(me, failedOver);

}

@Override

public void beforeReconnect(ActiveMQException exception) {

//noop

}

public void decideOnAction(Topology topology) {

//we may get called via multiple paths so need to guard

synchronized (decisionGuard) {

if (signal == BACKUP_ACTIVATION.FAIL_OVER) {

if (networkHealthCheck != null && !networkHealthCheck.check()) {

signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;

}

return;

}

if (!isLiveDown()) {

//lost connection but don"t know if live is down so restart as backup as we can"t replicate any more

ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();

signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;

} else {

// live is assumed to be down, backup fails-over

ActiveMQServerLogger.LOGGER.failingOverBasedOnQuorumVoteResults();

signal = BACKUP_ACTIVATION.FAIL_OVER;

}

/* use NetworkHealthCheck to determine if node is isolated

* if there are no addresses/urls configured then ignore and rely on quorum vote only

*/

if (networkHealthCheck != null && !networkHealthCheck.isEmpty()) {

if (networkHealthCheck.check()) {

// live is assumed to be down, backup fails-over

signal = BACKUP_ACTIVATION.FAIL_OVER;

} else {

ActiveMQServerLogger.LOGGER.serverIsolatedOnNetwork();

signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;

}

}

}

latch.countDown();

}

private boolean isLiveDown() {

//lets assume live is not down

Boolean decision = false;

int voteAttempts = 0;

int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize;

synchronized (voteGuard) {

while (!stopped && voteAttempts++ < voteRetries) {

//the live is dead so lets vote for quorum

QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID);

quorumManager.vote(quorumVote);

try {

quorumVote.await(quorumVoteWait, TimeUnit.SECONDS);

} catch (InterruptedException interruption) {

// No-op. The best the quorum can do now is to return the latest number it has

ActiveMQServerLogger.LOGGER.quorumVoteAwaitInterrupted();

}

quorumManager.voteComplete(quorumVote);

decision = quorumVote.getDecision();

if (decision) {

return decision;

}

try {

voteGuard.wait(voteRetryWait);

} catch (InterruptedException e) {

//nothing to do here

}

}

}

return decision;

}

public synchronized void reset() {

latch = new CountDownLatch(1);

}

//......

}

  • SharedNothingBackupQuorum的nodeDown及connectionFailed方法都会执行decideOnAction;该方法对于signal为BACKUP_ACTIVATION.FAIL_OVER的在networkHealthCheck不为null时执行networkHealthCheck.check(),若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING然后返回
  • 对于signal为其他值的执行isLiveDown方法,若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING,否则更新signal为BACKUP_ACTIVATION.FAIL_OVER;最后在networkHealthCheck不为null不为空时会执行networkHealthCheck.check(),返回true则更新signal为BACKUP_ACTIVATION.FAIL_OVER,否则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING
  • isLiveDown方法创建QuorumVoteServerConnect,然后执行quorumManager.vote(quorumVote)之后进行quorumVote.await(quorumVoteWait, TimeUnit.SECONDS),最后执行quorumManager.voteComplete(quorumVote),然后取quorumVote.getDecision()值,若为true则立刻返回,否则执行voteGuard.wait(voteRetryWait),进行重试,重试voteRetries次

SharedNothingBackupActivation

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java

public final class SharedNothingBackupActivation extends Activation {

//......

public void run() {

try {

logger.trace("SharedNothingBackupActivation..start");

synchronized (activeMQServer) {

activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);

}

//......

synchronized (this) {

logger.trace("Entered a synchronized");

if (closed)

return;

backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait());

activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);

activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer));

}

//......

SharedNothingBackupQuorum.BACKUP_ACTIVATION signal;

do {

if (closed) {

if (logger.isTraceEnabled()) {

logger.trace("Activation is closed, so giving up");

}

return;

}

if (logger.isTraceEnabled()) {

logger.trace("looking up the node through nodeLocator.locateNode()");

}

//locate the first live server to try to replicate

nodeLocator.locateNode();

Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration();

nodeID = nodeLocator.getNodeID();

if (logger.isTraceEnabled()) {

logger.trace("nodeID = " + nodeID);

}

//in a normal (non failback) scenario if we couldn"t find our live server we should fail

if (!attemptFailBack) {

if (logger.isTraceEnabled()) {

logger.trace("attemptFailback=false, nodeID=" + nodeID);

}

//this shouldn"t happen

if (nodeID == null) {

logger.debug("Throwing a RuntimeException as nodeID==null ant attemptFailback=false");

throw new RuntimeException("Could not establish the connection");

}

activeMQServer.getNodeManager().setNodeID(nodeID);

}

if (possibleLive != null) {

clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getA());

if (clusterControl == null) {

clusterControl = tryConnectToNodeInReplicatedCluster(clusterController, possibleLive.getB());

}

} else {

clusterControl = null;

}

if (clusterControl == null) {

if (logger.isTraceEnabled()) {

logger.trace("sleeping " + clusterController.getRetryIntervalForReplicatedCluster() + " it should retry");

}

//its ok to retry here since we haven"t started replication yet

//it may just be the server has gone since discovery

Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());

signal = SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;

continue;

}

activeMQServer.getThreadPool().execute(endpointConnector);

/**

* Wait for a signal from the the quorum manager, at this point if replication has been successful we can

* fail over or if there is an error trying to replicate (such as already replicating) we try the

* process again on the next live server. All the action happens inside {@link BackupQuorum}

*/

signal = backupQuorum.waitForStatusChange();

if (logger.isTraceEnabled()) {

logger.trace("Got a signal " + signal + " through backupQuorum.waitForStatusChange()");

}

/**

* replicationEndpoint will be holding lots of open files. Make sure they get

* closed/sync"ed.

*/

ActiveMQServerImpl.stopComponent(replicationEndpoint);

// time to give up

if (!activeMQServer.isStarted() || signal == STOP) {

if (logger.isTraceEnabled()) {

logger.trace("giving up on the activation:: activemqServer.isStarted=" + activeMQServer.isStarted() + " while signal = " + signal);

}

return;

} else if (signal == FAIL_OVER) {

// time to fail over

if (logger.isTraceEnabled()) {

logger.trace("signal == FAIL_OVER, breaking the loop");

}

break;

} else if (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING) {

// something has gone badly run restart from scratch

if (logger.isTraceEnabled()) {

logger.trace("Starting a new thread to stop the server!");

}

Thread startThread = new Thread(new Runnable() {

@Override

public void run() {

try {

if (logger.isTraceEnabled()) {

logger.trace("Calling activeMQServer.stop() and start() to restart the server");

}

activeMQServer.stop();

activeMQServer.start();

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.errorRestartingBackupServer(e, activeMQServer);

}

}

});

startThread.start();

return;

}

//ok, this live is no good, let"s reset and try again

//close this session factory, we"re done with it

clusterControl.close();

backupQuorum.reset();

if (replicationEndpoint.getChannel() != null) {

replicationEndpoint.getChannel().close();

replicationEndpoint.setChannel(null);

}

}

while (signal == SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING);

//......

} catch (Exception e) {

if (logger.isTraceEnabled()) {

logger.trace(e.getMessage() + ", serverStarted=" + activeMQServer.isStarted(), e);

}

if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !activeMQServer.isStarted())

// do not log these errors if the server is being stopped.

return;

ActiveMQServerLogger.LOGGER.initializationError(e);

}

}

//......

}

  • SharedNothingBackupActivation的run方法会创建SharedNothingBackupQuorum,然后while循环执行backupQuorum.waitForStatusChange()更新signal,直到signal不为SharedNothingBackupQuorum.BACKUP_ACTIVATION.ALREADY_REPLICATING;在signal为SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING时执行activeMQServer.stop()及activeMQServer.start();最后执行backupQuorum.reset()

小结

SharedNothingBackupQuorum的nodeDown及connectionFailed方法都会执行decideOnAction;该方法对于signal为BACKUP_ACTIVATION.FAIL_OVER的在networkHealthCheck不为null时执行networkHealthCheck.check(),若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING然后返回;对于signal为其他值的执行isLiveDown方法,若为false则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING,否则更新signal为BACKUP_ACTIVATION.FAIL_OVER;最后在networkHealthCheck不为null不为空时会执行networkHealthCheck.check(),返回true则更新signal为BACKUP_ACTIVATION.FAIL_OVER,否则更新signal为BACKUP_ACTIVATION.FAILURE_REPLICATING

doc

  • SharedNothingBackupQuorum

以上是 聊聊artemis的SharedNothingBackupQuorum 的全部内容, 来源链接: utcz.com/z/513324.html

回到顶部