聊聊artemismessage的duplicateProperty

编程

Message

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

public interface Message {

//......

default Object getDuplicateProperty() {

return null;

}

default byte[] getDuplicateIDBytes() {

Object duplicateID = getDuplicateProperty();

if (duplicateID == null) {

return null;

} else {

if (duplicateID instanceof SimpleString) {

return ((SimpleString) duplicateID).getData();

} else if (duplicateID instanceof String) {

return new SimpleString(duplicateID.toString()).getData();

} else {

return (byte[]) duplicateID;

}

}

}

//......

}

  • Message接口定义了getDuplicateProperty、getDuplicateIDBytes方法,其中getDuplicateIDBytes方法会读取getDuplicateProperty的值,然后转换为byte数组

CoreMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java

public class CoreMessage extends RefCountMessage implements ICoreMessage {

//......

public Object getDuplicateProperty() {

return getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);

}

//......

}

  • CoreMessage实现了ICoreMessage接口,而ICoreMessage接口继承了Message接口;它的getDuplicateProperty方法会取Message.HDR_DUPLICATE_DETECTION_ID属性的值

checkDuplicateID

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {

//......

private final ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = new ConcurrentHashMap<>();

//......

public RoutingStatus route(final Message message,

final RoutingContext context,

final boolean direct,

boolean rejectDuplicates,

final Binding bindingMove) throws Exception {

RoutingStatus result;

// Sanity check

if (message.getRefCount() > 0) {

throw new IllegalStateException("Message cannot be routed more than once");

}

final SimpleString address = context.getAddress(message);

setPagingStore(address, message);

AtomicBoolean startedTX = new AtomicBoolean(false);

applyExpiryDelay(message, address);

if (context.isDuplicateDetection() && !checkDuplicateID(message, context, rejectDuplicates, startedTX)) {

return RoutingStatus.DUPLICATED_ID;

}

message.clearInternalProperties();

Bindings bindings = addressManager.getBindingsForRoutingAddress(address);

AddressInfo addressInfo = addressManager.getAddressInfo(address);

//......

if (server.hasBrokerMessagePlugins()) {

server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, result));

}

return result;

}

private boolean checkDuplicateID(final Message message,

final RoutingContext context,

boolean rejectDuplicates,

AtomicBoolean startedTX) throws Exception {

// Check the DuplicateCache for the Bridge first

Object bridgeDup = message.removeExtraBytesProperty(Message.HDR_BRIDGE_DUPLICATE_ID);

if (bridgeDup != null) {

// if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one

byte[] bridgeDupBytes = (byte[]) bridgeDup;

DuplicateIDCache cacheBridge = getDuplicateIDCache(BRIDGE_CACHE_STR.concat(context.getAddress(message).toString()));

if (context.getTransaction() == null) {

context.setTransaction(new TransactionImpl(storageManager));

startedTX.set(true);

}

if (!cacheBridge.atomicVerify(bridgeDupBytes, context.getTransaction())) {

context.getTransaction().rollback();

startedTX.set(false);

message.decrementRefCount();

return false;

}

} else {

// if used BridgeDuplicate, it"s not going to use the regular duplicate

// since this will would break redistribution (re-setting the duplicateId)

byte[] duplicateIDBytes = message.getDuplicateIDBytes();

DuplicateIDCache cache = null;

boolean isDuplicate = false;

if (duplicateIDBytes != null) {

cache = getDuplicateIDCache(context.getAddress(message));

isDuplicate = cache.contains(duplicateIDBytes);

if (rejectDuplicates && isDuplicate) {

ActiveMQServerLogger.LOGGER.duplicateMessageDetected(message);

String warnMessage = "Duplicate message detected - message will not be routed. Message information:" + message.toString();

if (context.getTransaction() != null) {

context.getTransaction().markAsRollbackOnly(new ActiveMQDuplicateIdException(warnMessage));

}

message.decrementRefCount();

return false;

}

}

if (cache != null && !isDuplicate) {

if (context.getTransaction() == null) {

// We need to store the duplicate id atomically with the message storage, so we need to create a tx for this

context.setTransaction(new TransactionImpl(storageManager));

startedTX.set(true);

}

cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get());

}

}

return true;

}

public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {

DuplicateIDCache cache = duplicateIDCaches.get(address);

if (cache == null) {

cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache);

DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);

if (oldCache != null) {

cache = oldCache;

}

}

return cache;

}

//......

}

  • PostOfficeImpl的route方法在context.isDuplicateDetection()为true时,会调用checkDuplicateID方法,在其返回false时会直接返回RoutingStatus.DUPLICATED_ID;checkDuplicateID方法在bridgeDup为null时会通过message.getDuplicateIDBytes()获取duplicateIDBytes,若不为null则通过getDuplicateIDCache方法从duplicateIDCaches获取DuplicateIDCache,然后判断是否包含该duplicateIDBytes,若为true且rejectDuplicates为true则返回false;而对于cache不为null,且isDuplicate为false的则通过cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get())方法将该duplicateIDBytes添加到cache

handleDuplicateIds

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java

public class PostOfficeJournalLoader implements JournalLoader {

//......

public void handleDuplicateIds(Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception {

for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet()) {

SimpleString address = entry.getKey();

DuplicateIDCache cache = postOffice.getDuplicateIDCache(address);

if (configuration.isPersistIDCache()) {

cache.load(entry.getValue());

}

}

}

//......

}

  • PostOfficeJournalLoader的handleDuplicateIds方法在configuration.isPersistIDCache()为true时会执行cache.load(entry.getValue())

DuplicateIDCache

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/DuplicateIDCache.java

public interface DuplicateIDCache {

boolean contains(byte[] duplicateID);

boolean atomicVerify(byte[] duplID, Transaction tx) throws Exception;

void addToCache(byte[] duplicateID) throws Exception;

void addToCache(byte[] duplicateID, Transaction tx) throws Exception;

/**

* it will add the data to the cache.

* If TX == null it won"t use a transaction.

* if instantAdd=true, it won"t wait a transaction to add on the cache which is needed on the case of the Bridges

*/

void addToCache(byte[] duplicateID, Transaction tx, boolean instantAdd) throws Exception;

void deleteFromCache(byte[] duplicateID) throws Exception;

void load(List<Pair<byte[], Long>> theIds) throws Exception;

void load(Transaction tx, byte[] duplID);

void clear() throws Exception;

List<Pair<byte[], Long>> getMap();

}

  • DuplicateIDCache定义了contains、addToCache、load等方法

DuplicateIDCacheImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java

public class DuplicateIDCacheImpl implements DuplicateIDCache {

private static final Logger logger = Logger.getLogger(DuplicateIDCacheImpl.class);

// ByteHolder, position

private final Map<ByteArrayHolder, Integer> cache = new ConcurrentHashMap<>();

private final SimpleString address;

// Note - deliberately typed as ArrayList since we want to ensure fast indexed

// based array access

private final ArrayList<Pair<ByteArrayHolder, Long>> ids;

private int pos;

private final int cacheSize;

private final StorageManager storageManager;

private final boolean persist;

public DuplicateIDCacheImpl(final SimpleString address,

final int size,

final StorageManager storageManager,

final boolean persist) {

this.address = address;

cacheSize = size;

ids = new ArrayList<>(size);

this.storageManager = storageManager;

this.persist = persist;

}

@Override

public void load(final List<Pair<byte[], Long>> theIds) throws Exception {

long txID = -1;

// If we have more IDs than cache size, we shrink the first ones

int deleteCount = theIds.size() - cacheSize;

if (deleteCount < 0) {

deleteCount = 0;

}

for (Pair<byte[], Long> id : theIds) {

if (deleteCount > 0) {

if (txID == -1) {

txID = storageManager.generateID();

}

if (logger.isTraceEnabled()) {

logger.trace("DuplicateIDCacheImpl::load deleting id=" + describeID(id.getA(), id.getB()));

}

storageManager.deleteDuplicateIDTransactional(txID, id.getB());

deleteCount--;

} else {

ByteArrayHolder bah = new ByteArrayHolder(id.getA());

Pair<ByteArrayHolder, Long> pair = new Pair<>(bah, id.getB());

cache.put(bah, ids.size());

ids.add(pair);

if (logger.isTraceEnabled()) {

logger.trace("DuplicateIDCacheImpl::load loading id=" + describeID(id.getA(), id.getB()));

}

}

}

if (txID != -1) {

storageManager.commit(txID);

}

pos = ids.size();

if (pos == cacheSize) {

pos = 0;

}

}

public boolean contains(final byte[] duplID) {

boolean contains = cache.get(new ByteArrayHolder(duplID)) != null;

if (contains) {

logger.trace("DuplicateIDCacheImpl(" + this.address + ")::constains found a duplicate " + describeID(duplID, 0));

}

return contains;

}

public synchronized void addToCache(final byte[] duplID, final Transaction tx, boolean instantAdd) throws Exception {

long recordID = -1;

if (tx == null) {

if (persist) {

recordID = storageManager.generateID();

storageManager.storeDuplicateID(address, duplID, recordID);

}

addToCacheInMemory(duplID, recordID);

} else {

if (persist) {

recordID = storageManager.generateID();

storageManager.storeDuplicateIDTransactional(tx.getID(), address, duplID, recordID);

tx.setContainsPersistent();

}

if (logger.isTraceEnabled()) {

logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);

}

if (instantAdd) {

tx.addOperation(new AddDuplicateIDOperation(duplID, recordID, false));

} else {

// For a tx, it"s important that the entry is not added to the cache until commit

// since if the client fails then resends them tx we don"t want it to get rejected

tx.afterStore(new AddDuplicateIDOperation(duplID, recordID, true));

}

}

}

private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID) {

if (logger.isTraceEnabled()) {

logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding " + describeID(duplID, recordID));

}

ByteArrayHolder holder = new ByteArrayHolder(duplID);

cache.put(holder, pos);

Pair<ByteArrayHolder, Long> id;

if (pos < ids.size()) {

// Need fast array style access here -hence ArrayList typing

id = ids.get(pos);

// The id here might be null if it was explicit deleted

if (id.getA() != null) {

if (logger.isTraceEnabled()) {

logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory removing excess duplicateDetection " + describeID(id.getA().bytes, id.getB()));

}

cache.remove(id.getA());

// Record already exists - we delete the old one and add the new one

// Note we can"t use update since journal update doesn"t let older records get

// reclaimed

if (id.getB() != null) {

try {

storageManager.deleteDuplicateID(id.getB());

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.errorDeletingDuplicateCache(e);

}

}

}

id.setA(holder);

// The recordID could be negative if the duplicateCache is configured to not persist,

// -1 would mean null on this case

id.setB(recordID >= 0 ? recordID : null);

if (logger.isTraceEnabled()) {

logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory replacing old duplicateID by " + describeID(id.getA().bytes, id.getB()));

}

holder.pos = pos;

} else {

id = new Pair<>(holder, recordID >= 0 ? recordID : null);

if (logger.isTraceEnabled()) {

logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCacheInMemory Adding new duplicateID " + describeID(id.getA().bytes, id.getB()));

}

ids.add(id);

holder.pos = pos;

}

if (pos++ == cacheSize - 1) {

pos = 0;

}

}

//......

}

  • DuplicateIDCacheImpl实现了DuplicateIDCache接口,其load方法会将数据加载到cache中,其key为ByteArrayHolder类型;其contains方法则根据duplID的byte数组创建ByteArrayHolder,然后从cache中查找是否存在;addToCache方法在tx为null时,则执行addToCacheInMemory,否则往tx添加AddDuplicateIDOperation或者在afterStore时执行AddDuplicateIDOperation;addToCacheInMemory主要是往cache添加记录,同时将cache的大小维护在指定的cacheSize

AddDuplicateIDOperation

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java

   private final class AddDuplicateIDOperation extends TransactionOperationAbstract {

final byte[] duplID;

final long recordID;

volatile boolean done;

private final boolean afterCommit;

AddDuplicateIDOperation(final byte[] duplID, final long recordID, boolean afterCommit) {

this.duplID = duplID;

this.recordID = recordID;

this.afterCommit = afterCommit;

}

private void process() {

if (!done) {

addToCacheInMemory(duplID, recordID);

done = true;

}

}

@Override

public void afterCommit(final Transaction tx) {

if (afterCommit) {

process();

}

}

@Override

public void beforeCommit(Transaction tx) throws Exception {

if (!afterCommit) {

process();

}

}

@Override

public List<MessageReference> getRelatedMessageReferences() {

return null;

}

}

  • AddDuplicateIDOperation继承了TransactionOperationAbstract;其afterCommit、beforeCommit都会执行process方法,而process方法则是调用addToCacheInMemory(duplID, recordID)

小结

  • CoreMessage实现了ICoreMessage接口,而ICoreMessage接口继承了Message接口;它的getDuplicateProperty方法会取Message.HDR_DUPLICATE_DETECTION_ID属性的值
  • PostOfficeImpl的route方法在context.isDuplicateDetection()为true时,会调用checkDuplicateID方法,在其返回false时会直接返回RoutingStatus.DUPLICATED_ID;checkDuplicateID方法在bridgeDup为null时会通过message.getDuplicateIDBytes()获取duplicateIDBytes,若不为null则通过getDuplicateIDCache方法从duplicateIDCaches获取DuplicateIDCache,然后判断是否包含该duplicateIDBytes,若为true且rejectDuplicates为true则返回false;而对于cache不为null,且isDuplicate为false的则通过cache.addToCache(duplicateIDBytes, context.getTransaction(), startedTX.get())方法将该duplicateIDBytes添加到cache
  • PostOfficeJournalLoader的handleDuplicateIds方法在configuration.isPersistIDCache()为true时会执行cache.load(entry.getValue())

doc

  • PostOfficeImpl

以上是 聊聊artemismessage的duplicateProperty 的全部内容, 来源链接: utcz.com/z/513097.html

回到顶部