聊聊artemis的callFailoverTimeout

编程

establishNewConnection

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {

//......

protected RemotingConnection establishNewConnection() {

Connection transportConnection = createTransportConnection();

if (transportConnection == null) {

if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {

logger.trace("Neither backup or live were active, will just give up now");

}

return null;

}

RemotingConnection newConnection = clientProtocolManager.connect(transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, new SessionFactoryTopologyHandler());

newConnection.addFailureListener(new DelegatingFailureListener(newConnection.getID()));

schedulePing();

if (logger.isTraceEnabled()) {

logger.trace("returning " + newConnection);

}

return newConnection;

}

//......

}

  • ClientSessionFactoryImpl的establishNewConnection通过clientProtocolManager.connect创建RemotingConnection

ActiveMQClientProtocolManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java

public class ActiveMQClientProtocolManager implements ClientProtocolManager {

//......

public RemotingConnection connect(Connection transportConnection,

long callTimeout,

long callFailoverTimeout,

List<Interceptor> incomingInterceptors,

List<Interceptor> outgoingInterceptors,

TopologyResponseHandler topologyResponseHandler) {

this.connection = new RemotingConnectionImpl(createPacketDecoder(), transportConnection, callTimeout, callFailoverTimeout, incomingInterceptors, outgoingInterceptors, executor);

this.topologyResponseHandler = topologyResponseHandler;

getChannel0().setHandler(new Channel0Handler(connection));

sendHandshake(transportConnection);

return connection;

}

//......

}

  • ActiveMQClientProtocolManager的connect方法创建了RemotingConnectionImpl

RemotingConnectionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java

public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {

//......

private final long blockingCallTimeout;

private final long blockingCallFailoverTimeout;

//......

public RemotingConnectionImpl(final PacketDecoder packetDecoder,

final Connection transportConnection,

final long blockingCallTimeout,

final long blockingCallFailoverTimeout,

final List<Interceptor> incomingInterceptors,

final List<Interceptor> outgoingInterceptors,

final Executor connectionExecutor) {

this(packetDecoder, transportConnection, blockingCallTimeout, blockingCallFailoverTimeout, incomingInterceptors, outgoingInterceptors, true, null, connectionExecutor);

}

private RemotingConnectionImpl(final PacketDecoder packetDecoder,

final Connection transportConnection,

final long blockingCallTimeout,

final long blockingCallFailoverTimeout,

final List<Interceptor> incomingInterceptors,

final List<Interceptor> outgoingInterceptors,

final boolean client,

final SimpleString nodeID,

final Executor connectionExecutor) {

super(transportConnection, connectionExecutor);

this.packetDecoder = packetDecoder;

this.blockingCallTimeout = blockingCallTimeout;

this.blockingCallFailoverTimeout = blockingCallFailoverTimeout;

this.incomingInterceptors = incomingInterceptors;

this.outgoingInterceptors = outgoingInterceptors;

this.client = client;

this.nodeID = nodeID;

transportConnection.setProtocolConnection(this);

if (logger.isTraceEnabled()) {

logger.trace("RemotingConnectionImpl created: " + this);

}

}

@Override

public long getBlockingCallTimeout() {

return blockingCallTimeout;

}

@Override

public long getBlockingCallFailoverTimeout() {

return blockingCallFailoverTimeout;

}

//......

}

  • RemotingConnectionImpl定义了blockingCallFailoverTimeout属性

waitForFailOver

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java

public final class ChannelImpl implements Channel {

//......

private final Lock lock = new ReentrantLock();

private final Condition sendCondition = lock.newCondition();

private final Condition failoverCondition = lock.newCondition();

private boolean failingOver;

//......

private void waitForFailOver(String timeoutMsg) {

try {

if (connection.getBlockingCallFailoverTimeout() < 0) {

while (failingOver) {

failoverCondition.await();

}

} else if (!ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout())) {

logger.debug(timeoutMsg);

}

} catch (InterruptedException e) {

throw new ActiveMQInterruptedException(e);

}

}

private boolean send(final Packet packet, final int reconnectID, final boolean flush, final boolean batch) {

if (invokeInterceptors(packet, interceptors, connection) != null) {

return false;

}

synchronized (sendLock) {

packet.setChannelID(id);

if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {

packet.setCorrelationID(responseAsyncCache.nextCorrelationID());

}

if (logger.isTraceEnabled()) {

logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Sending packet nonblocking " + packet + " on channelID=" + id);

}

ActiveMQBuffer buffer = packet.encode(connection);

lock.lock();

try {

if (failingOver) {

waitForFailOver("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " timed-out waiting for fail-over condition on non-blocking send");

}

// Sanity check

if (transferring) {

throw ActiveMQClientMessageBundle.BUNDLE.cannotSendPacketDuringFailover();

}

if (resendCache != null && packet.isRequiresConfirmations()) {

addResendPacket(packet);

}

} finally {

lock.unlock();

}

if (logger.isTraceEnabled()) {

logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " Writing buffer for channelID=" + id);

}

checkReconnectID(reconnectID);

//We do this outside the lock as ResponseCache is threadsafe and allows responses to come in,

//As the send could block if the response cache cannot add, preventing responses to be handled.

if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {

while (!responseAsyncCache.add(packet)) {

try {

Thread.sleep(1);

} catch (Exception e) {

// Ignore

}

}

}

// The actual send must be outside the lock, or with OIO transport, the write can block if the tcp

// buffer is full, preventing any incoming buffers being handled and blocking failover

try {

connection.getTransportConnection().write(buffer, flush, batch);

} catch (Throwable t) {

//If runtime exception, we must remove from the cache to avoid filling up the cache causing it to be full.

//The client would get still know about this as the exception bubbles up the call stack instead.

if (responseAsyncCache != null && packet.isRequiresResponse() && packet.isResponseAsync()) {

responseAsyncCache.remove(packet.getCorrelationID());

}

throw t;

}

return true;

}

}

@Override

public void lock() {

if (logger.isTraceEnabled()) {

logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " lock channel " + this);

}

lock.lock();

reconnectID.incrementAndGet();

failingOver = true;

lock.unlock();

}

@Override

public void unlock() {

if (logger.isTraceEnabled()) {

logger.trace("RemotingConnectionID=" + (connection == null ? "NULL" : connection.getID()) + " unlock channel " + this);

}

lock.lock();

failingOver = false;

failoverCondition.signalAll();

lock.unlock();

}

//......

}

  • ChannelImpl的waitForFailOver在connection.getBlockingCallFailoverTimeout()大于等于0的时候执行ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout());send方法在failingOver为true时会执行waitForFailOver方法;其lock方法会设置failingOver为true,unlock方法会设置failingOver为false

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

//......

private final Channel sessionChannel;

//......

@Override

public void lockCommunications() {

sessionChannel.lock();

}

@Override

public void releaseCommunications() {

sessionChannel.setTransferring(false);

sessionChannel.unlock();

}

//......

}

  • ActiveMQSessionContext的lockCommunications方法会执行sessionChannel.lock(),而releaseCommunications会执行sessionChannel.unlock()

ClientSessionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {

//......

public void preHandleFailover(RemotingConnection connection) {

// We lock the channel to prevent any packets to be added to the re-send

// cache during the failover process

//we also do this before the connection fails over to give the session a chance to block for failover

sessionContext.lockCommunications();

}

public boolean handleFailover(final RemotingConnection backupConnection, ActiveMQException cause) {

boolean suc = true;

synchronized (this) {

if (closed) {

return true;

}

boolean resetCreditManager = false;

try {

// TODO remove this and encapsulate it

boolean reattached = sessionContext.reattachOnNewConnection(backupConnection);

if (!reattached) {

// We change the name of the Session, otherwise the server could close it while we are still sending the recreate

// in certain failure scenarios

// For instance the fact we didn"t change the name of the session after failover or reconnect

// was the reason allowing multiple Sessions to be closed simultaneously breaking concurrency

this.name = UUIDGenerator.getInstance().generateStringUUID();

sessionContext.resetName(name);

Map<ConsumerContext, ClientConsumerInternal> clonedConsumerEntries = cloneConsumerEntries();

for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {

consumer.clearAtFailover();

}

// The session wasn"t found on the server - probably we"re failing over onto a backup server where the

// session won"t exist or the target server has been restarted - in this case the session will need to be

// recreated,

// and we"ll need to recreate any consumers

// It could also be that the server hasn"t been restarted, but the session is currently executing close,

// and

// that

// has already been executed on the server, that"s why we can"t find the session- in this case we *don"t*

// want

// to recreate the session, we just want to unblock the blocking call

if (!inClose && mayAttemptToFailover) {

sessionContext.recreateSession(username, password, minLargeMessageSize, xa, autoCommitSends, autoCommitAcks, preAcknowledge);

for (Map.Entry<ConsumerContext, ClientConsumerInternal> entryx : clonedConsumerEntries.entrySet()) {

ClientConsumerInternal consumerInternal = entryx.getValue();

synchronized (consumerInternal) {

if (!consumerInternal.isClosed()) {

sessionContext.recreateConsumerOnServer(consumerInternal, entryx.getKey().getId(), started);

}

}

}

if ((!autoCommitAcks || !autoCommitSends) && workDone) {

// this is protected by a lock, so we can guarantee nothing will sneak here

// while we do our work here

rollbackOnly = true;

}

if (currentXID != null) {

sessionContext.xaFailed(currentXID);

rollbackOnly = true;

}

// Now start the session if it was already started

if (started) {

for (ClientConsumerInternal consumer : clonedConsumerEntries.values()) {

consumer.clearAtFailover();

consumer.start();

}

sessionContext.restartSession();

}

resetCreditManager = true;

}

sessionContext.returnBlocking(cause);

}

} catch (Throwable t) {

ActiveMQClientLogger.LOGGER.failedToHandleFailover(t);

suc = false;

} finally {

sessionContext.releaseCommunications();

}

if (resetCreditManager) {

synchronized (producerCreditManager) {

producerCreditManager.reset();

}

// Also need to send more credits for consumers, otherwise the system could hand with the server

// not having any credits to send

}

}

HashMap<String, String> metaDataToSend;

synchronized (metadata) {

metaDataToSend = new HashMap<>(metadata);

}

sessionContext.resetMetadata(metaDataToSend);

return suc;

}

//......

}

  • ClientSessionImpl的preHandleFailover方法会执行sessionContext.lockCommunications(),而handleFailover方法在recreateConsumerOnServer之后的finally里头会执行sessionContext.releaseCommunications()

小结

RemotingConnectionImpl定义了blockingCallFailoverTimeout属性;ChannelImpl的waitForFailOver在connection.getBlockingCallFailoverTimeout()大于等于0的时候执行ConcurrentUtil.await(failoverCondition, connection.getBlockingCallFailoverTimeout());send方法在failingOver为true时会执行waitForFailOver方法;其lock方法会设置failingOver为true,unlock方法会设置failingOver为false

doc

  • RemotingConnectionImpl

以上是 聊聊artemis的callFailoverTimeout 的全部内容, 来源链接: utcz.com/z/513546.html

回到顶部