聊聊artemis的groupRebalance

编程

groupRebalance

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

//......

private volatile boolean groupRebalance;

private volatile int groupBuckets;

private MessageGroups<Consumer> groups;

//......

public void addConsumer(final Consumer consumer) throws Exception {

if (logger.isDebugEnabled()) {

logger.debug(this + " adding consumer " + consumer);

}

enterCritical(CRITICAL_CONSUMER);

try {

synchronized (this) {

if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >= maxConsumers) {

throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);

}

if (consumers.isEmpty()) {

this.supportsDirectDeliver = consumer.supportsDirectDelivery();

} else {

if (!consumer.supportsDirectDelivery()) {

this.supportsDirectDeliver = false;

}

}

cancelRedistributor();

ConsumerHolder<Consumer> newConsumerHolder = new ConsumerHolder<>(consumer);

if (consumers.add(newConsumerHolder)) {

int currentConsumerCount = consumers.size();

if (delayBeforeDispatch >= 0) {

dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());

}

if (currentConsumerCount >= consumersBeforeDispatch) {

if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {

dispatchStartTimeUpdater.set(this, System.currentTimeMillis());

}

}

}

if (groupRebalance) {

groups.removeAll();

}

if (refCountForConsumers != null) {

refCountForConsumers.increment();

}

}

} finally {

leaveCritical(CRITICAL_CONSUMER);

}

}

public static MessageGroups<Consumer> groupMap(int groupBuckets) {

if (groupBuckets == -1) {

return new SimpleMessageGroups<>();

} else if (groupBuckets == 0) {

return DisabledMessageGroups.instance();

} else {

return new BucketMessageGroups<>(groupBuckets);

}

}

//......

}

  • QueueImpl定义了groupRebalance属性,默认为false;addConsumer方法在groupRebalance为true是会执行groups.removeAll();它还定义了groupBuckets属性,默认为-1,创建的是SimpleMessageGroups

MessageGroups

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageGroups.java

public interface MessageGroups<C> {

void put(SimpleString key, C consumer);

C get(SimpleString key);

C remove(SimpleString key);

boolean removeIf(Predicate<? super C> filter);

void removeAll();

int size();

Map<SimpleString, C> toMap();

}

  • MessageGroups接口定义了put、get、remove、removeIf、removeAll、size、toMap方法

SimpleMessageGroups

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SimpleMessageGroups.java

public class SimpleMessageGroups<C> extends MapMessageGroups<C> {

public SimpleMessageGroups() {

super(new HashMap<>());

}

}

  • SimpleMessageGroups继承了MapMessageGroups,这里使用的是HashMap

MapMessageGroups

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MapMessageGroups.java

abstract class MapMessageGroups<C> implements MessageGroups<C> {

private final Map<SimpleString, C> groups;

protected MapMessageGroups(Map<SimpleString, C> groups) {

this.groups = groups;

}

@Override

public void put(SimpleString key, C consumer) {

groups.put(key, consumer);

}

@Override

public C get(SimpleString key) {

return groups.get(key);

}

@Override

public C remove(SimpleString key) {

return groups.remove(key);

}

@Override

public boolean removeIf(Predicate<? super C> filter) {

return groups.values().removeIf(filter);

}

@Override

public void removeAll() {

groups.clear();

}

@Override

public int size() {

return groups.size();

}

@Override

public Map<SimpleString, C> toMap() {

return new HashMap<>(groups);

}

}

  • MapMessageGroups是个抽象类,它声明实现了MessageGroups定义的方法

handleMessageGroup

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

//......

private volatile boolean exclusive;

private final QueueConsumers<ConsumerHolder<? extends Consumer>> consumers = new QueueConsumersImpl<>();

private ConsumerHolder<Redistributor> redistributor;

//......

private boolean deliver(final MessageReference ref) {

synchronized (this) {

if (!supportsDirectDeliver) {

return false;

}

if (isPaused() || !canDispatch() && redistributor == null) {

return false;

}

if (checkExpired(ref)) {

return true;

}

consumers.reset();

while (consumers.hasNext() || redistributor != null) {

ConsumerHolder<? extends Consumer> holder = redistributor == null ? consumers.next() : redistributor;

Consumer consumer = holder.consumer;

final SimpleString groupID = extractGroupID(ref);

Consumer groupConsumer = getGroupConsumer(groupID);

if (groupConsumer != null) {

consumer = groupConsumer;

}

HandleStatus status = handle(ref, consumer);

if (status == HandleStatus.HANDLED) {

final MessageReference reference;

if (redistributor == null) {

reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);

} else {

reference = ref;

}

incrementMesssagesAdded();

deliveriesInTransit.countUp();

reference.setInDelivery(true);

proceedDeliver(consumer, reference);

consumers.reset();

return true;

}

if (redistributor != null || groupConsumer != null) {

break;

}

}

if (logger.isTraceEnabled()) {

logger.tracef("Queue " + getName() + " is out of direct delivery as no consumers handled a delivery");

}

return false;

}

}

private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {

if (exclusive) {

if (groupConsumer == null) {

exclusiveConsumer = consumer;

if (groupFirstKey != null) {

return new GroupFirstMessageReference(groupFirstKey, ref);

}

}

consumers.repeat();

} else if (groupID != null) {

if (extractGroupSequence(ref) == -1) {

groups.remove(groupID);

consumers.repeat();

} else if (groupConsumer == null) {

groups.put(groupID, consumer);

if (groupFirstKey != null) {

return new GroupFirstMessageReference(groupFirstKey, ref);

}

} else {

consumers.repeat();

}

}

return ref;

}

private SimpleString extractGroupID(MessageReference ref) {

if (internalQueue || exclusive || groupBuckets == 0) {

return null;

} else {

try {

return ref.getMessage().getGroupID();

} catch (Throwable e) {

ActiveMQServerLogger.LOGGER.unableToExtractGroupID(e);

return null;

}

}

}

private Consumer getGroupConsumer(SimpleString groupID) {

Consumer groupConsumer = null;

if (exclusive) {

// If exclusive is set, then this overrides the consumer chosen round-robin

groupConsumer = exclusiveConsumer;

} else {

// If a group id is set, then this overrides the consumer chosen round-robin

if (groupID != null) {

groupConsumer = groups.get(groupID);

}

}

return groupConsumer;

}

private int extractGroupSequence(MessageReference ref) {

if (internalQueue) {

return 0;

} else {

try {

// But we don"t use the groupID on internal queues (clustered queues) otherwise the group map would leak forever

return ref.getMessage().getGroupSequence();

} catch (Throwable e) {

ActiveMQServerLogger.LOGGER.unableToExtractGroupSequence(e);

return 0;

}

}

}

//......

}

  • QueueImpl的deliver方法会从MessageReference抽取groupId,在根据groupId从groups中寻找groupConsumer,若不为null,则重置consumer,然后执行handle方法,若为null则使用consumers.next()得到的consumer,然后执行handle方法;handle之后对于redistributor为null的会执行handleMessageGroup方法;handleMessageGroup方法对于非exclusive且groupID不为null的,对于extractGroupSequence(ref)为-1的则groups.remove(groupID)以及consumers.repeat(),对于groupConsumer为null的执行groups.put(groupID, consumer)

ArrayResettableIterator

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ArrayResettableIterator.java

public class ArrayResettableIterator<T> implements ResettableIterator<T> {

private final Object[] array;

private int cursor = 0;

private int endPos = -1;

private boolean hasNext;

public ArrayResettableIterator(Object[] array) {

this.array = array;

reset();

}

public static <T> ResettableIterator<T> iterator(Collection<T> collection) {

return new ArrayResettableIterator<>(collection.toArray());

}

@Override

public void reset() {

endPos = cursor;

hasNext = array.length > 0;

}

@Override

public boolean hasNext() {

return hasNext;

}

@Override

public T next() {

if (!hasNext) {

throw new IllegalStateException();

}

@SuppressWarnings("unchecked") T result = (T) array[cursor];

cursor++;

if (cursor == array.length) {

cursor = 0;

}

if (cursor == endPos) {

hasNext = false;

}

return result;

}

}

  • ArrayResettableIterator提供了reset方法,其方式是round robins,该方法会将endPos重置为当前的cursor,而将hasNext方法重置为array.length > 0

小结

QueueImpl定义了groupRebalance属性,默认为false;addConsumer方法在groupRebalance为true是会执行groups.removeAll();它还定义了groupBuckets属性,默认为-1,创建的是SimpleMessageGroups

doc

  • QueueImpl

以上是 聊聊artemis的groupRebalance 的全部内容, 来源链接: utcz.com/z/513129.html

回到顶部