聊聊artemis的transactionTimeoutScanPeriod

编程

transactionTimeoutScanPeriod

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java

public class ConfigurationImpl implements Configuration, Serializable {

//......

private long transactionTimeoutScanPeriod = ActiveMQDefaultConfiguration.getDefaultTransactionTimeoutScanPeriod();

//......

@Override

public long getTransactionTimeoutScanPeriod() {

return transactionTimeoutScanPeriod;

}

@Override

public ConfigurationImpl setTransactionTimeoutScanPeriod(final long period) {

transactionTimeoutScanPeriod = period;

return this;

}

//......

}

  • ConfigurationImpl定义了transactionTimeoutScanPeriod属性,默认为1000

ActiveMQServerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

public class ActiveMQServerImpl implements ActiveMQServer {

//......

synchronized boolean initialisePart1(boolean scalingDown) throws Exception {

//......

resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), configuration.getTransactionTimeoutScanPeriod(), scheduledPool);

//......

}

//......

}

  • ActiveMQServerImpl的initialisePart1使用configuration.getTransactionTimeout()、configuration.getTransactionTimeoutScanPeriod()、scheduledPool创建了ResourceManagerImpl

ResourceManagerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java

public class ResourceManagerImpl implements ResourceManager {

private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<>();

private final List<HeuristicCompletionHolder> heuristicCompletions = new ArrayList<>();

private final int defaultTimeoutSeconds;

private boolean started = false;

private TxTimeoutHandler task;

private final long txTimeoutScanPeriod;

private final ScheduledExecutorService scheduledThreadPool;

public ResourceManagerImpl(final int defaultTimeoutSeconds,

final long txTimeoutScanPeriod,

final ScheduledExecutorService scheduledThreadPool) {

this.defaultTimeoutSeconds = defaultTimeoutSeconds;

this.txTimeoutScanPeriod = txTimeoutScanPeriod;

this.scheduledThreadPool = scheduledThreadPool;

}

// ActiveMQComponent implementation

@Override

public int size() {

return transactions.size();

}

@Override

public void start() throws Exception {

if (started) {

return;

}

task = new TxTimeoutHandler();

Future<?> future = scheduledThreadPool.scheduleAtFixedRate(task, txTimeoutScanPeriod, txTimeoutScanPeriod, TimeUnit.MILLISECONDS);

task.setFuture(future);

started = true;

}

@Override

public void stop() throws Exception {

if (!started) {

return;

}

if (task != null) {

task.close();

}

started = false;

}

//......

}

  • ResourceManagerImpl实现了ResourceManager接口,其start方法创建了TxTimeoutHandler,并以txTimeoutScanPeriod的fixedRate去调度执行

TxTimeoutHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/ResourceManagerImpl.java

   private class TxTimeoutHandler implements Runnable {

private boolean closed = false;

private Future<?> future;

@Override

public void run() {

if (closed) {

return;

}

Set<Transaction> timedoutTransactions = new HashSet<>();

long now = System.currentTimeMillis();

for (Transaction tx : transactions.values()) {

if (tx.hasTimedOut(now, defaultTimeoutSeconds)) {

Transaction removedTX = removeTransaction(tx.getXid());

if (removedTX != null) {

ActiveMQServerLogger.LOGGER.timedOutXID(removedTX.getXid());

timedoutTransactions.add(removedTX);

}

}

}

for (Transaction failedTransaction : timedoutTransactions) {

try {

failedTransaction.rollback();

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.errorTimingOutTX(e, failedTransaction.getXid());

}

}

}

synchronized void setFuture(final Future<?> future) {

this.future = future;

}

void close() {

if (future != null) {

future.cancel(false);

}

closed = true;

}

}

  • TxTimeoutHandler实现了Runnable接口,其run方法会遍历transactions,挨个执行tx.hasTimedOut(now, defaultTimeoutSeconds),对于timeout的则执行removeTransaction(tx.getXid()),之后挨个执行rollback

小结

ActiveMQServerImpl的initialisePart1使用configuration.getTransactionTimeout()、configuration.getTransactionTimeoutScanPeriod()、scheduledPool创建了ResourceManagerImpl;ResourceManagerImpl实现了ResourceManager接口,其start方法创建了TxTimeoutHandler,并以txTimeoutScanPeriod的fixedRate去调度执行;TxTimeoutHandler实现了Runnable接口,其run方法会遍历transactions,挨个执行tx.hasTimedOut(now, defaultTimeoutSeconds),对于timeout的则执行removeTransaction(tx.getXid()),之后挨个执行rollback

doc

  • ResourceManagerImpl

以上是 聊聊artemis的transactionTimeoutScanPeriod 的全部内容, 来源链接: utcz.com/z/513238.html

回到顶部