聊聊artemismessage的priority

编程

priority

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java

public class CoreMessage extends RefCountMessage implements ICoreMessage {

//......

protected byte priority;

public byte getPriority() {

return priority;

}

public CoreMessage setPriority(byte priority) {

this.priority = priority;

messageChanged();

return this;

}

//......

}

  • CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法

messageReferences.add

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

//......

private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());

//......

private synchronized void internalAddTail(final MessageReference ref) {

refAdded(ref);

messageReferences.addTail(ref, getPriority(ref));

pendingMetrics.incrementMetrics(ref);

enforceRing(false);

}

private void internalAddHead(final MessageReference ref) {

queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());

pendingMetrics.incrementMetrics(ref);

refAdded(ref);

int priority = getPriority(ref);

messageReferences.addHead(ref, priority);

ref.setInDelivery(false);

}

private void internalAddSorted(final MessageReference ref) {

queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());

pendingMetrics.incrementMetrics(ref);

refAdded(ref);

int priority = getPriority(ref);

messageReferences.addSorted(ref, priority);

}

private int getPriority(MessageReference ref) {

try {

return ref.getMessage().getPriority();

} catch (Throwable e) {

ActiveMQServerLogger.LOGGER.unableToGetMessagePriority(e);

return 4; // the default one in case of failure

}

}

//......

}

  • QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列

PriorityLinkedList

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java

public interface PriorityLinkedList<T> {

void addHead(T t, int priority);

void addTail(T t, int priority);

void addSorted(T t, int priority);

T poll();

void clear();

/**

* Returns the size of this list.<br>

* It is safe to be called concurrently.

*/

int size();

LinkedListIterator<T> iterator();

/**

* Returns {@code true} if empty, {@code false} otherwise.<br>

* It is safe to be called concurrently.

*/

boolean isEmpty();

}

  • PriorityLinkedList接口定义了根据priority的addHead、addTail、addSorted方法,其size以及isEmpty要求是线程安全的

PriorityLinkedListImpl

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java

public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {

private static final AtomicIntegerFieldUpdater<PriorityLinkedListImpl> SIZE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PriorityLinkedListImpl.class, "size");

protected LinkedListImpl<T>[] levels;

private volatile int size;

private int lastReset;

private int highestPriority = -1;

private int lastPriority = -1;

public PriorityLinkedListImpl(final int priorities) {

this(priorities, null);

}

public PriorityLinkedListImpl(final int priorities, Comparator<T> comparator) {

levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities);

for (int i = 0; i < priorities; i++) {

levels[i] = new LinkedListImpl<>(comparator);

}

}

private void checkHighest(final int priority) {

if (lastPriority != priority || priority > highestPriority) {

lastPriority = priority;

if (lastReset == Integer.MAX_VALUE) {

lastReset = 0;

} else {

lastReset++;

}

}

if (priority > highestPriority) {

highestPriority = priority;

}

}

@Override

public void addHead(final T t, final int priority) {

checkHighest(priority);

levels[priority].addHead(t);

exclusiveIncrementSize(1);

}

@Override

public void addTail(final T t, final int priority) {

checkHighest(priority);

levels[priority].addTail(t);

exclusiveIncrementSize(1);

}

@Override

public void addSorted(T t, int priority) {

checkHighest(priority);

levels[priority].addSorted(t);

exclusiveIncrementSize(1);

}

@Override

public T poll() {

T t = null;

// We are just using a simple prioritization algorithm:

// Highest priority refs always get returned first.

// This could cause starvation of lower priority refs.

// TODO - A better prioritization algorithm

for (int i = highestPriority; i >= 0; i--) {

LinkedListImpl<T> ll = levels[i];

if (ll.size() != 0) {

t = ll.poll();

if (t != null) {

exclusiveIncrementSize(-1);

if (ll.size() == 0) {

if (highestPriority == i) {

highestPriority--;

}

}

}

break;

}

}

return t;

}

@Override

public void clear() {

for (LinkedListImpl<T> list : levels) {

list.clear();

}

exclusiveSetSize(0);

}

private void exclusiveIncrementSize(int amount) {

SIZE_UPDATER.lazySet(this, this.size + amount);

}

private void exclusiveSetSize(int value) {

SIZE_UPDATER.lazySet(this, value);

}

@Override

public int size() {

return size;

}

@Override

public boolean isEmpty() {

return size == 0;

}

@Override

public LinkedListIterator<T> iterator() {

return new PriorityLinkedListIterator();

}

//......

}

  • PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;其addHead、addTail、addSorted先执行checkHighest(priority)维护highestPriority,之后调用对应priority的LinkedListImpl的addHead、addTail、addSorted方法,最后调用exclusiveIncrementSize方法递增size;其poll方法会从highestPriority的LinkedListImpl开始poll

小结

CoreMessage定义了priority属性(Values range from 0 (less priority) to 9 (more priority) inclusive),并提供了getPriority、setPriority方法;QueueImpl定义了messageReferences,其类型为PriorityLinkedList<MessageReference>;其internalAddTail、internalAddHead、internalAddSorted方法都会调用getPriority方法获取priority,出现异常返回4,之后通过messageReferences的addTail、addHead、addSorted方法添加到队列;PriorityLinkedListImpl实现了PriorityLinkedList接口,其构造器需要priorities参数,它使用Array.newInstance(LinkedListImpl.class, priorities)来创建并初始化levels数组,其数组元素类型为LinkedListImpl;PriorityLinkedListImpl的addHead、addTail、addSorted均委托给LinkedListImpl类

doc

  • QueueImpl

以上是 聊聊artemismessage的priority 的全部内容, 来源链接: utcz.com/z/513058.html

回到顶部