聊聊artemis的ExpiryScanner

编程

startExpiryScanner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {

//......

private ExpiryReaper expiryReaperRunnable;

//......

public synchronized void startExpiryScanner() {

if (expiryReaperPeriod > 0) {

if (expiryReaperRunnable != null)

expiryReaperRunnable.stop();

expiryReaperRunnable = new ExpiryReaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), expiryReaperPeriod, TimeUnit.MILLISECONDS, false);

expiryReaperRunnable.start();

}

}

public synchronized void stop() throws Exception {

started = false;

managementService.removeNotificationListener(this);

if (expiryReaperRunnable != null)

expiryReaperRunnable.stop();

if (addressQueueReaperRunnable != null)

addressQueueReaperRunnable.stop();

addressManager.clear();

queueInfos.clear();

}

//......

}

  • PostOfficeImpl的startExpiryScanner方法会执行expiryReaperRunnable.start();其stop方法会执行expiryReaperRunnable.stop()

ExpiryReaper

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

   private final class ExpiryReaper extends ActiveMQScheduledComponent {

ExpiryReaper(ScheduledExecutorService scheduledExecutorService,

Executor executor,

long checkPeriod,

TimeUnit timeUnit,

boolean onDemand) {

super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);

}

@Override

public void run() {

// The reaper thread should be finished case the PostOffice is gone

// This is to avoid leaks on PostOffice between stops and starts

for (Queue queue : getLocalQueues()) {

try {

queue.expireReferences();

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);

}

}

}

}

  • ExpiryReaper继承了ActiveMQScheduledComponent,其run方法会遍历localQueue挨个执行queue.expireReferences()

expireReferences

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

//......

private final ExpiryScanner expiryScanner = new ExpiryScanner();

//......

public void expireReferences() {

if (isExpirationRedundant()) {

return;

}

if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {

expiryScanner.scannerRunning.incrementAndGet();

getExecutor().execute(expiryScanner);

}

}

//......

}

  • QueueImpl的expireReferences方法会往executor提交执行expiryScanner

ExpiryScanner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

   class ExpiryScanner implements Runnable {

public AtomicInteger scannerRunning = new AtomicInteger(0);

@Override

public void run() {

boolean expired = false;

boolean hasElements = false;

int elementsExpired = 0;

LinkedList<MessageReference> expiredMessages = new LinkedList<>();

synchronized (QueueImpl.this) {

if (queueDestroyed) {

return;

}

if (logger.isDebugEnabled()) {

logger.debug("Scanning for expires on " + QueueImpl.this.getName());

}

LinkedListIterator<MessageReference> iter = iterator();

try {

while (postOffice.isStarted() && iter.hasNext()) {

hasElements = true;

MessageReference ref = iter.next();

if (ref.getMessage().isExpired()) {

incDelivering(ref);

expired = true;

expiredMessages.add(ref);

iter.remove();

if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {

logger.debug("Breaking loop of expiring");

scannerRunning.incrementAndGet();

getExecutor().execute(this);

break;

}

}

}

} finally {

try {

iter.close();

} catch (Throwable ignored) {

}

scannerRunning.decrementAndGet();

logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");

}

}

if (!expiredMessages.isEmpty()) {

Transaction tx = new TransactionImpl(storageManager);

for (MessageReference ref : expiredMessages) {

if (tx == null) {

tx = new TransactionImpl(storageManager);

}

try {

expire(tx, ref);

refRemoved(ref);

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);

}

}

try {

tx.commit();

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e);

}

logger.debug("Expired " + elementsExpired + " references");

}

// If empty we need to schedule depaging to make sure we would depage expired messages as well

if ((!hasElements || expired) && pageIterator != null && pageIterator.tryNext() > 0) {

scheduleDepage(true);

}

}

}

  • ExpiryScanner实现了Runnable接口,其run方法会遍历MessageReference的iterator,挨个判断message是否expired,若为true则执行incDelivering并且添加到expiredMessages以及从iterator中移除,之后递增expiredMessages并且判断是否大于等于MAX_DELIVERIES_IN_LOOP(1000),若为true则提交到executor执行并跳出循环;之后遍历expiredMessages,挨个执行expire(tx, ref)以及refRemoved(ref),最后执行tx.commit()

isExpired

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

public interface Message {

//......

default boolean isExpired() {

if (getExpiration() == 0) {

return false;

}

return System.currentTimeMillis() - getExpiration() >= 0;

}

//......

}

  • Message的isExpired方法判断expiration是否为0,若为0则返回false,否则判断当前时间与expiration的差值是否大于等于0,若为true则返回true

expire

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

//......

private void expire(final Transaction tx, final MessageReference ref) throws Exception {

SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();

if (expiryAddress != null) {

Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

if (bindingList == null || bindingList.getBindings().isEmpty()) {

ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);

acknowledge(tx, ref, AckReason.EXPIRED, null);

} else {

move(expiryAddress, tx, ref, true, true);

}

} else {

if (!printErrorExpiring) {

printErrorExpiring = true;

// print this only once

ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);

}

acknowledge(tx, ref, AckReason.EXPIRED, null);

}

if (server != null && server.hasBrokerMessagePlugins()) {

ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);

if (expiryLogger == null) {

expiryLogger = new ExpiryLogger();

tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);

tx.addOperation(expiryLogger);

}

expiryLogger.addExpiry(address, ref);

}

}

//......

}

  • expire方法先获取expiryAddress,之后通过postOffice.lookupBindingsForAddress(expiryAddress)获取bindingList,若不为null则执行move操作

move

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

//......

private void move(final SimpleString toAddress,

final Transaction tx,

final MessageReference ref,

final boolean expiry,

final boolean rejectDuplicate,

final long... queueIDs) throws Exception {

Message copyMessage = makeCopy(ref, expiry);

copyMessage.setAddress(toAddress);

if (queueIDs != null && queueIDs.length > 0) {

ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);

for (long id : queueIDs) {

buffer.putLong(id);

}

copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());

}

postOffice.route(copyMessage, tx, false, rejectDuplicate);

if (expiry) {

acknowledge(tx, ref, AckReason.EXPIRED, null);

} else {

acknowledge(tx, ref);

}

}

//......

}

  • move方法先使用makeCopy进行拷贝得到copyMessage,然后通过postOffice.route(copyMessage, tx, false, rejectDuplicate)进行重新路由,最后执行acknowledge方法,若expiry为true则reason为AckReason.EXPIRED

小结

  • ExpiryScanner实现了Runnable接口,其run方法会遍历MessageReference的iterator,挨个判断message是否expired,若为true则执行incDelivering并且添加到expiredMessages以及从iterator中移除;之后遍历expiredMessages,挨个执行expire(tx, ref)以及refRemoved(ref),最后执行tx.commit()
  • expire方法expire方法先获取expiryAddress,之后通过postOffice.lookupBindingsForAddress(expiryAddress)获取bindingList,若不为null则执行move操作
  • move方法先使用makeCopy进行拷贝得到copyMessage,然后通过postOffice.route(copyMessage, tx, false, rejectDuplicate)进行重新路由,最后执行acknowledge方法,若expiry为true则reason为AckReason.EXPIRED

doc

  • QueueImpl

以上是 聊聊artemis的ExpiryScanner 的全部内容, 来源链接: utcz.com/z/512980.html

回到顶部