聊聊artemis的QuorumManager

编程

ClusterTopologyListener

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClusterTopologyListener.java

public interface ClusterTopologyListener {

/**

* Triggered when a node joins the cluster.

*

* @param member

* @param last if the whole cluster topology is being transmitted (after adding the listener to

* the cluster connection) this parameter will be {@code true} for the last topology

* member.

*/

void nodeUP(TopologyMember member, boolean last);

/**

* Triggered when a node leaves the cluster.

*

* @param eventUID

* @param nodeID the id of the node leaving the cluster

*/

void nodeDown(long eventUID, String nodeID);

}

  • ClusterTopologyListener接口定义了nodeUP、nodeDown方法

QuorumManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java

public final class QuorumManager implements ClusterTopologyListener, ActiveMQComponent {

private final ExecutorService executor;

private final ClusterController clusterController;

/**

* all the current registered {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum}"s

*/

private final Map<String, Quorum> quorums = new HashMap<>();

/**

* any currently running runnables.

*/

private final Map<QuorumVote, VoteRunnableHolder> voteRunnables = new HashMap<>();

private final Map<SimpleString, QuorumVoteHandler> handlers = new HashMap<>();

private boolean started = false;

/**

* this is the max size that the cluster has been.

*/

private int maxClusterSize = 0;

public QuorumManager(ExecutorService threadPool, ClusterController clusterController) {

this.clusterController = clusterController;

this.executor = threadPool;

}

/**

* we start by simply creating the server locator and connecting in a separate thread

*

* @throws Exception

*/

@Override

public void start() throws Exception {

if (started)

return;

started = true;

}

/**

* stops the server locator

*

* @throws Exception

*/

@Override

public void stop() throws Exception {

if (!started)

return;

synchronized (voteRunnables) {

started = false;

for (VoteRunnableHolder voteRunnableHolder : voteRunnables.values()) {

for (VoteRunnable runnable : voteRunnableHolder.runnables) {

runnable.close();

}

}

}

for (Quorum quorum : quorums.values()) {

quorum.close();

}

quorums.clear();

}

/**

* are we started

*

* @return

*/

@Override

public boolean isStarted() {

return started;

}

/**

* registers a {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum} so that it can be notified of changes in the cluster.

*

* @param quorum

*/

public void registerQuorum(Quorum quorum) {

quorums.put(quorum.getName(), quorum);

quorum.setQuorumManager(this);

}

/**

* unregisters a {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum}.

*

* @param quorum

*/

public void unRegisterQuorum(Quorum quorum) {

quorums.remove(quorum.getName());

}

/**

* called by the {@link org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal} when the topology changes. we update the

* {@code maxClusterSize} if needed and inform the {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum}"s.

*

* @param topologyMember the topolgy changed

* @param last if the whole cluster topology is being transmitted (after adding the listener to

* the cluster connection) this parameter will be {@code true} for the last topology

*/

@Override

public void nodeUP(TopologyMember topologyMember, boolean last) {

final int newClusterSize = clusterController.getDefaultClusterSize();

maxClusterSize = newClusterSize > maxClusterSize ? newClusterSize : maxClusterSize;

for (Quorum quorum : quorums.values()) {

quorum.nodeUp(clusterController.getDefaultClusterTopology());

}

}

/**

* notify the {@link org.apache.activemq.artemis.core.server.cluster.qourum.Quorum} of a topology change.

*

* @param eventUID

* @param nodeID the id of the node leaving the cluster

*/

@Override

public void nodeDown(long eventUID, String nodeID) {

for (Quorum quorum : quorums.values()) {

quorum.nodeDown(clusterController.getDefaultClusterTopology(), eventUID, nodeID);

}

}

/**

* returns the maximum size this cluster has been.

*

* @return max size

*/

public int getMaxClusterSize() {

return maxClusterSize;

}

/**

* ask the quorum to vote within a specific quorum.

*

* @param quorumVote the vote to acquire

*/

public void vote(final QuorumVote quorumVote) {

List<VoteRunnable> runnables = new ArrayList<>();

synchronized (voteRunnables) {

if (!started)

return;

//send a vote to each node

ActiveMQServerLogger.LOGGER.initiatingQuorumVote(quorumVote.getName());

for (TopologyMemberImpl tm : clusterController.getDefaultClusterTopology().getMembers()) {

//but not ourselves

if (!tm.getNodeId().equals(clusterController.getNodeID().toString())) {

Pair<TransportConfiguration, TransportConfiguration> pair = tm.getConnector();

final TransportConfiguration serverTC = pair.getA();

VoteRunnable voteRunnable = new VoteRunnable(serverTC, quorumVote);

runnables.add(voteRunnable);

}

}

if (runnables.size() > 0) {

voteRunnables.put(quorumVote, new VoteRunnableHolder(quorumVote, runnables, runnables.size()));

for (VoteRunnable runnable : runnables) {

executor.submit(runnable);

}

} else {

quorumVote.allVotesCast(clusterController.getDefaultClusterTopology());

}

}

}

/**

* handle a vote received on the quorum

*

* @param handler the name of the handler to use for the vote

* @param vote the vote

* @return the updated vote

*/

public Vote vote(SimpleString handler, Vote vote) {

QuorumVoteHandler quorumVoteHandler = handlers.get(handler);

return quorumVoteHandler.vote(vote);

}

/**

* must be called by the quorum when it is happy on an outcome. only one vote can take place at anyone time for a

* specific quorum

*

* @param quorumVote the vote

*/

public void voteComplete(QuorumVoteServerConnect quorumVote) {

VoteRunnableHolder holder = voteRunnables.remove(quorumVote);

if (holder != null) {

for (VoteRunnable runnable : holder.runnables) {

runnable.close();

}

}

}

/**

* called to register vote handlers on the quorum

*

* @param quorumVoteHandler the vote handler

*/

public void registerQuorumHandler(QuorumVoteHandler quorumVoteHandler) {

handlers.put(quorumVoteHandler.getQuorumName(), quorumVoteHandler);

}

//......

}

  • QuorumManager实现了ClusterTopologyListener接口,它提供了registerQuorum方法用于注册quorum;其nodeUP方法会遍历quorums,挨个执行quorum.nodeUp(clusterController.getDefaultClusterTopology());其nodeDown方法会遍历quorums,挨个执行quorum.nodeDown(clusterController.getDefaultClusterTopology(), eventUID, nodeID);其vote方法会遍历clusterController.getDefaultClusterTopology().getMembers(),对于非clusterController.getNodeID()的创建VoteRunnable并添加到runnables中,对于runnables不为空的挨个提交到executor执行,否则执行quorumVote.allVotesCast(clusterController.getDefaultClusterTopology())

VoteRunnable

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java

   private final class VoteRunnable implements Runnable {

private final TransportConfiguration serverTC;

private final QuorumVote quorumVote;

private ClusterControl clusterControl;

private VoteRunnable(TransportConfiguration serverTC, QuorumVote quorumVote) {

this.serverTC = serverTC;

this.quorumVote = quorumVote;

}

@Override

public void run() {

try {

Vote vote;

if (!started)

return;

//try to connect to the node i want to send a vote to

clusterControl = clusterController.connectToNode(serverTC);

clusterControl.authorize();

//if we are successful get the vote and check whether we need to send it to the target server,

//just connecting may be enough

vote = quorumVote.connected();

if (vote.isRequestServerVote()) {

vote = clusterControl.sendQuorumVote(quorumVote.getName(), vote);

quorumVote.vote(vote);

} else {

quorumVote.vote(vote);

}

} catch (Exception e) {

Vote vote = quorumVote.notConnected();

quorumVote.vote(vote);

} finally {

try {

if (clusterControl != null) {

clusterControl.close();

}

} catch (Exception e) {

//ignore

}

QuorumManager.this.votingComplete(quorumVote);

}

}

public void close() {

if (clusterControl != null) {

clusterControl.close();

}

}

}

  • VoteRunnable实现了Runnable接口,其run方法先执行quorumVote.connected(),对于vote.isRequestServerVote()的执行clusterControl.sendQuorumVote(quorumVote.getName(), vote),之后执行quorumVote.vote(vote)标记vote,最后执行QuorumManager.this.votingComplete(quorumVote)

VoteRunnableHolder

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumManager.java

   private final class VoteRunnableHolder {

private final QuorumVote quorumVote;

private final List<VoteRunnable> runnables;

private int size;

private VoteRunnableHolder(QuorumVote quorumVote, List<VoteRunnable> runnables, int size) {

this.quorumVote = quorumVote;

this.runnables = runnables;

this.size = size;

}

public synchronized void voteComplete() {

size--;

if (size <= 0) {

quorumVote.allVotesCast(clusterController.getDefaultClusterTopology());

}

}

}

  • VoteRunnableHolder的voteComplete会递减size,在最后size小于等于0时触发quorumVote.allVotesCast(clusterController.getDefaultClusterTopology()),标记所有投票已经发送出去

QuorumVoteMessage

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java

public class QuorumVoteMessage extends PacketImpl {

private SimpleString handler;

private Vote vote;

private ActiveMQBuffer voteBuffer;

public QuorumVoteMessage() {

super(QUORUM_VOTE);

}

public QuorumVoteMessage(SimpleString handler, Vote vote) {

super(QUORUM_VOTE);

this.handler = handler;

this.vote = vote;

}

@Override

public void encodeRest(ActiveMQBuffer buffer) {

super.encodeRest(buffer);

buffer.writeSimpleString(handler);

vote.encode(buffer);

}

@Override

public void decodeRest(ActiveMQBuffer buffer) {

super.decodeRest(buffer);

handler = buffer.readSimpleString();

voteBuffer = ActiveMQBuffers.fixedBuffer(buffer.readableBytes());

buffer.readBytes(voteBuffer);

}

public SimpleString getHandler() {

return handler;

}

public Vote getVote() {

return vote;

}

public void decode(QuorumVoteHandler voteHandler) {

vote = voteHandler.decode(voteBuffer);

}

@Override

public String toString() {

StringBuffer buff = new StringBuffer(getParentString());

buff.append("]");

return buff.toString();

}

@Override

public String getParentString() {

StringBuffer buff = new StringBuffer(super.getParentString());

buff.append(", vote=" + vote);

buff.append(", handler=" + handler);

return buff.toString();

}

}

  • QuorumVoteMessage继承了PacketImpl,其type为QUORUM_VOTE

ClusterControllerChannelHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java

   private final class ClusterControllerChannelHandler implements ChannelHandler {

private final Channel clusterChannel;

private final Acceptor acceptorUsed;

private final CoreRemotingConnection remotingConnection;

private final ChannelHandler channelHandler;

boolean authorized = false;

private ClusterControllerChannelHandler(Channel clusterChannel,

Acceptor acceptorUsed,

CoreRemotingConnection remotingConnection,

ChannelHandler channelHandler) {

this.clusterChannel = clusterChannel;

this.acceptorUsed = acceptorUsed;

this.remotingConnection = remotingConnection;

this.channelHandler = channelHandler;

}

@Override

public void handlePacket(Packet packet) {

if (!isStarted()) {

if (channelHandler != null) {

channelHandler.handlePacket(packet);

}

return;

}

if (!authorized) {

if (packet.getType() == PacketImpl.CLUSTER_CONNECT) {

ClusterConnection clusterConnection = acceptorUsed.getClusterConnection();

//if this acceptor isn"t associated with a cluster connection use the default

if (clusterConnection == null) {

clusterConnection = server.getClusterManager().getDefaultConnection(null);

}

ClusterConnectMessage msg = (ClusterConnectMessage) packet;

if (server.getConfiguration().isSecurityEnabled() && !clusterConnection.verify(msg.getClusterUser(), msg.getClusterPassword())) {

clusterChannel.send(new ClusterConnectReplyMessage(false));

} else {

authorized = true;

clusterChannel.send(new ClusterConnectReplyMessage(true));

}

}

} else {

if (packet.getType() == PacketImpl.NODE_ANNOUNCE) {

NodeAnnounceMessage msg = (NodeAnnounceMessage) packet;

Pair<TransportConfiguration, TransportConfiguration> pair;

if (msg.isBackup()) {

pair = new Pair<>(null, msg.getConnector());

} else {

pair = new Pair<>(msg.getConnector(), msg.getBackupConnector());

}

if (logger.isTraceEnabled()) {

logger.trace("Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);

}

if (acceptorUsed != null) {

ClusterConnection clusterConn = acceptorUsed.getClusterConnection();

if (clusterConn != null) {

String scaleDownGroupName = msg.getScaleDownGroupName();

clusterConn.nodeAnnounced(msg.getCurrentEventID(), msg.getNodeID(), msg.getBackupGroupName(), scaleDownGroupName, pair, msg.isBackup());

} else {

logger.debug("Cluster connection is null on acceptor = " + acceptorUsed);

}

} else {

logger.debug("there is no acceptor used configured at the CoreProtocolManager " + this);

}

} else if (packet.getType() == PacketImpl.QUORUM_VOTE) {

QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet;

QuorumVoteHandler voteHandler = quorumManager.getVoteHandler(quorumVoteMessage.getHandler());

if (voteHandler == null) {

ActiveMQServerLogger.LOGGER.noVoteHandlerConfigured();

return;

}

quorumVoteMessage.decode(voteHandler);

ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString());

Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote());

ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());

clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));

} else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) {

ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet;

//we don"t really need to check as it should always be true

if (server.getNodeID().equals(message.getTargetNodeId())) {

server.addScaledDownNode(message.getScaledDownNodeId());

}

} else if (channelHandler != null) {

channelHandler.handlePacket(packet);

}

}

}

}

  • ClusterControllerChannelHandler实现了ChannelHandler接口,其handlePacket方法在接收到type为PacketImpl.QUORUM_VOTE的数据时会执行quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote()),然后将返还的vote包装为QuorumVoteReplyMessage响应回去

QuorumVoteHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/QuorumVoteHandler.java

public interface QuorumVoteHandler {

/**

* @param vote

* @return

*/

Vote vote(Vote vote);

/**

* the name of the quorum vote

*

* @return the name

*/

SimpleString getQuorumName();

Vote decode(ActiveMQBuffer voteBuffer);

}

  • QuorumVoteHandler定义了vote、getQuorumName、decode方法

ServerConnectVoteHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConnectVoteHandler.java

public class ServerConnectVoteHandler implements QuorumVoteHandler {

private final ActiveMQServerImpl server;

public ServerConnectVoteHandler(ActiveMQServerImpl server) {

this.server = server;

}

@Override

public Vote vote(Vote vote) {

ServerConnectVote serverConnectVote = (ServerConnectVote) vote;

String nodeid = serverConnectVote.getNodeId();

try {

TopologyMemberImpl member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeid);

if (member != null && member.getLive() != null) {

ActiveMQServerLogger.LOGGER.nodeFoundInClusterTopology(nodeid);

return new ServerConnectVote(nodeid, (Boolean) vote.getVote(), member.getLive().toString());

}

ActiveMQServerLogger.LOGGER.nodeNotFoundInClusterTopology(nodeid);

} catch (Exception e) {

e.printStackTrace();

}

return new ServerConnectVote(nodeid, !((Boolean) vote.getVote()), null);

}

@Override

public SimpleString getQuorumName() {

return QuorumVoteServerConnect.LIVE_FAILOVER_VOTE;

}

@Override

public Vote decode(ActiveMQBuffer voteBuffer) {

ServerConnectVote vote = new ServerConnectVote();

vote.decode(voteBuffer);

return vote;

}

}

  • ServerConnectVoteHandler实现了QuorumVoteHandler接口,其vote方法根据nodeid获取topology的member,判断其是否alive,返回新的ServerConnectVote,其getQuorumName返回QuorumVoteServerConnect.LIVE_FAILOVER_VOTE

小结

QuorumManager提供了两个vote方法,只有QuorumVote参数的方法用于发起quorumVote,它会遍历clusterController.getDefaultClusterTopology().getMembers()去发起QuorumVoteMessage;另外一个方法vote方法是ClusterControllerChannelHandler接收到QuorumVoteMessage执行的方法,它将委托给了QuorumVoteHandler,然后响应QuorumVoteReplyMessage回去;VoteRunnable接收到QuorumVoteReplyMessage时会执行quorumVote.vote(vote)来统计投票

doc

  • QuorumManager

以上是 聊聊artemis的QuorumManager 的全部内容, 来源链接: utcz.com/z/513278.html

回到顶部