聊聊artemis的persistenceEnabled

编程

persistenceEnabled

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java

public class ConfigurationImpl implements Configuration, Serializable {

//......

private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();

public boolean isPersistenceEnabled() {

return persistenceEnabled;

}

public ConfigurationImpl setPersistenceEnabled(final boolean enable) {

persistenceEnabled = enable;

return this;

}

//......

}

  • ConfigurationImpl定义了persistenceEnabled属性,默认为ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled()(true)

createStorageManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

public class ActiveMQServerImpl implements ActiveMQServer {

//......

protected StorageManager createStorageManager() {

if (configuration.isPersistenceEnabled()) {

if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {

JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);

this.getCriticalAnalyzer().add(journal);

return journal;

} else {

// Default to File Based Storage Manager, (Legacy default configuration).

JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO);

this.getCriticalAnalyzer().add(journal);

return journal;

}

}

return new NullStorageManager();

}

//......

}

  • ActiveMQServerImpl的createStorageManager方法在configuration.isPersistenceEnabled()为true时创建的StorageManager是JDBCJournalStorageManager或者JournalStorageManager;否则创建的StorageManager是NullStorageManager

processRoute

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {

//......

public void processRoute(final Message message,

final RoutingContext context,

final boolean direct) throws Exception {

final List<MessageReference> refs = new ArrayList<>();

Transaction tx = context.getTransaction();

Long deliveryTime = message.getScheduledDeliveryTime();

for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet()) {

PagingStore store = pagingManager.getPageStore(entry.getKey());

if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) {

if (message.isLargeMessage()) {

confirmLargeMessageSend(tx, message);

}

// We need to kick delivery so the Queues may check for the cursors case they are empty

schedulePageDelivery(tx, entry);

continue;

}

for (Queue queue : entry.getValue().getNonDurableQueues()) {

MessageReference reference = MessageReference.Factory.createReference(message, queue);

if (deliveryTime != null) {

reference.setScheduledDeliveryTime(deliveryTime);

}

refs.add(reference);

message.incrementRefCount();

}

Iterator<Queue> iter = entry.getValue().getDurableQueues().iterator();

while (iter.hasNext()) {

Queue queue = iter.next();

MessageReference reference = MessageReference.Factory.createReference(message, queue);

if (context.isAlreadyAcked(context.getAddress(message), queue)) {

reference.setAlreadyAcked();

if (tx != null) {

queue.acknowledge(tx, reference);

}

}

if (deliveryTime != null) {

reference.setScheduledDeliveryTime(deliveryTime);

}

refs.add(reference);

if (message.isDurable()) {

int durableRefCount = message.incrementDurableRefCount();

if (durableRefCount == 1) {

if (tx != null) {

storageManager.storeMessageTransactional(tx.getID(), message);

} else {

storageManager.storeMessage(message);

}

if (message.isLargeMessage()) {

confirmLargeMessageSend(tx, message);

}

}

if (tx != null) {

storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());

tx.setContainsPersistent();

} else {

storageManager.storeReference(queue.getID(), message.getMessageID(), !iter.hasNext());

}

if (deliveryTime != null && deliveryTime > 0) {

if (tx != null) {

storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);

} else {

storageManager.updateScheduledDeliveryTime(reference);

}

}

}

message.incrementRefCount();

}

}

if (tx != null) {

tx.addOperation(new AddOperation(refs));

} else {

// This will use the same thread if there are no pending operations

// avoiding a context switch on this case

storageManager.afterCompleteOperations(new IOCallback() {

@Override

public void onError(final int errorCode, final String errorMessage) {

ActiveMQServerLogger.LOGGER.ioErrorAddingReferences(errorCode, errorMessage);

}

@Override

public void done() {

context.processReferences(refs, direct);

}

});

}

}

//......

}

  • PostOfficeImpl的processRoute方法会判断message.isDurable(),若为true且durableRefCount为1则会执行storageManager.storeMessage或者storageManager.storeMessageTransactional方法

storeMessage

AbstractJournalStorageManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java

public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {

//......

protected Journal messageJournal;

//......

public void storeMessage(final Message message) throws Exception {

if (message.getMessageID() <= 0) {

// Sanity check only... this shouldn"t happen unless there is a bug

throw ActiveMQMessageBundle.BUNDLE.messageIdNotAssigned();

}

readLock();

try {

// Note that we don"t sync, the add reference that comes immediately after will sync if

// appropriate

if (message.isLargeMessage()) {

messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_LARGE_MESSAGE, LargeMessagePersister.getInstance(), message, false, getContext(false));

} else {

messageJournal.appendAddRecord(message.getMessageID(), JournalRecordIds.ADD_MESSAGE_PROTOCOL, message.getPersister(), message, false, getContext(false));

}

} finally {

readUnLock();

}

}

//......

}

  • JDBCJournalStorageManager及JournalStorageManager都继承了AbstractJournalStorageManager,其storeMessage方法会调用messageJournal.appendAddRecord方法;二者的messageJournal实现不同,一个是JDBCJournalImpl,一个是JournalImpl

NullStorageManager

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java

public class NullStorageManager implements StorageManager {

//......

public void storeMessage(final Message message) throws Exception {

}

//......

}

  • NullStorageManager实现了StorageManager接口,其storeMessage为空方法

小结

ConfigurationImpl定义了persistenceEnabled属性,默认为ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled()(true);ActiveMQServerImpl的createStorageManager方法在configuration.isPersistenceEnabled()为true时创建的StorageManager是JDBCJournalStorageManager或者JournalStorageManager;否则创建的StorageManager是NullStorageManager;PostOfficeImpl的processRoute方法会判断message.isDurable(),若为true且durableRefCount为1则会执行storageManager.storeMessage或者storageManager.storeMessageTransactional方法

doc

  • ConfigurationImpl

以上是 聊聊artemis的persistenceEnabled 的全部内容, 来源链接: utcz.com/z/512999.html

回到顶部