聊聊artemis的FederatedQueue

编程

FederatedQueue

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/queue/FederatedQueue.java

public class FederatedQueue extends FederatedAbstract implements ActiveMQServerConsumerPlugin, Serializable {

private static final Logger logger = Logger.getLogger(FederatedQueue.class);

private final Set<Matcher> includes;

private final Set<Matcher> excludes;

private final Filter metaDataFilter;

private final int priorityAdjustment;

private final FederationQueuePolicyConfiguration config;

public FederatedQueue(Federation federation, FederationQueuePolicyConfiguration config, ActiveMQServer server, FederationUpstream federationUpstream) throws ActiveMQException {

super(federation, server, federationUpstream);

Objects.requireNonNull(config.getName());

this.config = config;

this.priorityAdjustment = federationUpstream.getPriorityAdjustment() + (config.getPriorityAdjustment() == null ? -1 : config.getPriorityAdjustment());

String metaDataFilterString = config.isIncludeFederated() ? null : "hyphenated_props:" + FederatedQueueConsumer.FEDERATION_NAME + " IS NOT NULL";

metaDataFilter = FilterImpl.createFilter(metaDataFilterString);

if (config.getIncludes().isEmpty()) {

includes = Collections.emptySet();

} else {

includes = new HashSet<>(config.getIncludes().size());

for (FederationQueuePolicyConfiguration.Matcher include : config.getIncludes()) {

includes.add(new Matcher(include, wildcardConfiguration));

}

}

if (config.getExcludes().isEmpty()) {

excludes = Collections.emptySet();

} else {

excludes = new HashSet<>(config.getExcludes().size());

for (FederationQueuePolicyConfiguration.Matcher exclude : config.getExcludes()) {

excludes.add(new Matcher(exclude, wildcardConfiguration));

}

}

}

@Override

public void start() {

super.start();

server.getPostOffice()

.getAllBindings()

.values()

.stream()

.filter(b -> b instanceof QueueBinding)

.map(b -> (QueueBinding) b)

.forEach(b -> conditionalCreateRemoteConsumer(b.getQueue()));

}

/**

* After a consumer has been created

*

* @param consumer the created consumer

*/

@Override

public synchronized void afterCreateConsumer(ServerConsumer consumer) {

conditionalCreateRemoteConsumer(consumer);

}

public FederationQueuePolicyConfiguration getConfig() {

return config;

}

private void conditionalCreateRemoteConsumer(ServerConsumer consumer) {

if (server.hasBrokerFederationPlugins()) {

final AtomicBoolean conditionalCreate = new AtomicBoolean(true);

try {

server.callBrokerFederationPlugins(plugin -> {

conditionalCreate.set(conditionalCreate.get() && plugin.federatedQueueConditionalCreateConsumer(consumer));

});

} catch (ActiveMQException t) {

ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "federatedQueueConditionalCreateConsumer");

throw new IllegalStateException(t.getMessage(), t.getCause());

}

if (!conditionalCreate.get()) {

return;

}

}

createRemoteConsumer(consumer);

}

private void conditionalCreateRemoteConsumer(Queue queue) {

queue.getConsumers()

.stream()

.filter(consumer -> consumer instanceof ServerConsumer)

.map(c -> (ServerConsumer) c).forEach(this::conditionalCreateRemoteConsumer);

}

private void createRemoteConsumer(ServerConsumer consumer) {

//We check the session meta data to see if its a federation session, if so by default we ignore these.

//To not ignore these, set include-federated to true, which will mean no meta data filter.

ServerSession serverSession = server.getSessionByID(consumer.getSessionID());

if (metaDataFilter != null && serverSession != null && metaDataFilter.match(serverSession.getMetaData())) {

return;

}

if (match(consumer)) {

FederatedConsumerKey key = getKey(consumer);

Transformer transformer = getTransformer(config.getTransformerRef());

Transformer fqqnTransformer = message -> message == null ? null : message.setAddress(key.getFqqn());

createRemoteConsumer(key, mergeTransformers(fqqnTransformer, transformer), null);

}

}

//......

}

  • FederatedQueue继承了FederatedAbstract,其start方法遍历QueueBinding,然后挨个执行conditionalCreateRemoteConsumer;conditionalCreateRemoteConsumer方法通过父类的createRemoteConsumer来创建remoteQueueConsumer

FederatedAbstract

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedAbstract.java

public abstract class FederatedAbstract implements ActiveMQServerBasePlugin {

//......

public synchronized void createRemoteConsumer(FederatedConsumerKey key, Transformer transformer, ClientSessionCallback callback) {

if (started) {

FederatedQueueConsumer remoteQueueConsumer = remoteQueueConsumers.get(key);

if (remoteQueueConsumer == null) {

if (server.hasBrokerFederationPlugins()) {

try {

server.callBrokerFederationPlugins(plugin -> plugin.beforeCreateFederatedQueueConsumer(key));

} catch (ActiveMQException t) {

ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeCreateFederatedQueueConsumer");

throw new IllegalStateException(t.getMessage(), t.getCause());

}

}

remoteQueueConsumer = new FederatedQueueConsumerImpl(federation, server, transformer, key, upstream, callback);

remoteQueueConsumer.start();

remoteQueueConsumers.put(key, remoteQueueConsumer);

if (server.hasBrokerFederationPlugins()) {

try {

final FederatedQueueConsumer finalConsumer = remoteQueueConsumer;

server.callBrokerFederationPlugins(plugin -> plugin.afterCreateFederatedQueueConsumer(finalConsumer));

} catch (ActiveMQException t) {

ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterCreateFederatedQueueConsumer");

throw new IllegalStateException(t.getMessage(), t.getCause());

}

}

}

remoteQueueConsumer.incrementCount();

}

}

//......

}

  • FederatedAbstract的createRemoteConsumer创建FederatedQueueConsumerImpl并执行其start方法

FederatedQueueConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/federation/FederatedQueueConsumerImpl.java

public class FederatedQueueConsumerImpl implements FederatedQueueConsumer, SessionFailureListener {

private static final Logger logger = Logger.getLogger(FederatedQueueConsumerImpl.class);

private final ActiveMQServer server;

private final Federation federation;

private final FederatedConsumerKey key;

private final Transformer transformer;

private final FederationUpstream upstream;

private final AtomicInteger count = new AtomicInteger();

private final ScheduledExecutorService scheduledExecutorService;

private final int intialConnectDelayMultiplier = 2;

private final int intialConnectDelayMax = 30;

private final ClientSessionCallback clientSessionCallback;

private ClientSessionFactoryInternal clientSessionFactory;

private ClientSession clientSession;

private ClientConsumer clientConsumer;

public FederatedQueueConsumerImpl(Federation federation, ActiveMQServer server, Transformer transformer, FederatedConsumerKey key, FederationUpstream upstream, ClientSessionCallback clientSessionCallback) {

this.federation = federation;

this.server = server;

this.key = key;

this.transformer = transformer;

this.upstream = upstream;

this.scheduledExecutorService = server.getScheduledPool();

this.clientSessionCallback = clientSessionCallback;

}

@Override

public FederationUpstream getFederationUpstream() {

return upstream;

}

@Override

public Federation getFederation() {

return federation;

}

@Override

public FederatedConsumerKey getKey() {

return key;

}

@Override

public ClientSession getClientSession() {

return clientSession;

}

@Override

public int incrementCount() {

return count.incrementAndGet();

}

@Override

public int decrementCount() {

return count.decrementAndGet();

}

@Override

public void start() {

scheduleConnect(0);

}

private void scheduleConnect(int delay) {

scheduledExecutorService.schedule(() -> {

try {

connect();

} catch (Exception e) {

scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, intialConnectDelayMultiplier, intialConnectDelayMax));

}

}, delay, TimeUnit.SECONDS);

}

private void connect() throws Exception {

try {

if (clientConsumer == null) {

synchronized (this) {

this.clientSessionFactory = (ClientSessionFactoryInternal) upstream.getConnection().clientSessionFactory();

this.clientSession = clientSessionFactory.createSession(upstream.getUser(), upstream.getPassword(), false, true, true, clientSessionFactory.getServerLocator().isPreAcknowledge(), clientSessionFactory.getServerLocator().getAckBatchSize());

this.clientSession.addFailureListener(this);

this.clientSession.addMetaData(FEDERATION_NAME, federation.getName().toString());

this.clientSession.addMetaData(FEDERATION_UPSTREAM_NAME, upstream.getName().toString());

this.clientSession.start();

if (clientSessionCallback != null) {

clientSessionCallback.callback(clientSession);

}

if (clientSession.queueQuery(key.getQueueName()).isExists()) {

this.clientConsumer = clientSession.createConsumer(key.getQueueName(), key.getFilterString(), key.getPriority(), false);

this.clientConsumer.setMessageHandler(this);

} else {

throw new ActiveMQNonExistentQueueException("Queue " + key.getQueueName() + " does not exist on remote");

}

}

}

} catch (Exception e) {

try {

if (clientSessionFactory != null) {

clientSessionFactory.cleanup();

}

disconnect();

} catch (ActiveMQException ignored) {

}

throw e;

}

}

@Override

public void close() {

scheduleDisconnect(0);

}

private void scheduleDisconnect(int delay) {

scheduledExecutorService.schedule(() -> {

try {

disconnect();

} catch (Exception ignored) {

}

}, delay, TimeUnit.SECONDS);

}

private void disconnect() throws ActiveMQException {

if (clientConsumer != null) {

clientConsumer.close();

}

if (clientSession != null) {

clientSession.close();

}

clientConsumer = null;

clientSession = null;

if (clientSessionFactory != null && (!upstream.getConnection().isSharedConnection() ||

clientSessionFactory.numSessions() == 0)) {

clientSessionFactory.close();

clientSessionFactory = null;

}

}

@Override

public void onMessage(ClientMessage clientMessage) {

try {

if (server.hasBrokerFederationPlugins()) {

try {

server.callBrokerFederationPlugins(plugin -> plugin.beforeFederatedQueueConsumerMessageHandled(this, clientMessage));

} catch (ActiveMQException t) {

ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "beforeFederatedQueueConsumerMessageHandled");

throw new IllegalStateException(t.getMessage(), t.getCause());

}

}

Message message = transformer == null ? clientMessage : transformer.transform(clientMessage);

if (message != null) {

server.getPostOffice().route(message, true);

}

clientMessage.acknowledge();

if (server.hasBrokerFederationPlugins()) {

try {

server.callBrokerFederationPlugins(plugin -> plugin.afterFederatedQueueConsumerMessageHandled(this, clientMessage));

} catch (ActiveMQException t) {

ActiveMQServerLogger.LOGGER.federationPluginExecutionError(t, "afterFederatedQueueConsumerMessageHandled");

throw new IllegalStateException(t.getMessage(), t.getCause());

}

}

} catch (Exception e) {

try {

clientSession.rollback();

} catch (ActiveMQException e1) {

}

}

}

@Override

public void connectionFailed(ActiveMQException exception, boolean failedOver) {

connectionFailed(exception, failedOver, null);

}

@Override

public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {

try {

clientSessionFactory.cleanup();

clientSessionFactory.close();

clientConsumer = null;

clientSession = null;

clientSessionFactory = null;

} catch (Throwable dontCare) {

}

start();

}

@Override

public void beforeReconnect(ActiveMQException exception) {

}

public interface ClientSessionCallback {

void callback(ClientSession clientSession) throws ActiveMQException;

}

}

  • FederatedQueueConsumerImpl的start方法执行scheduleConnect方法,delay参数为0;scheduleConnect方法会使用scheduledExecutorService调度执行connect方法,在捕获到异常时计算新的delay再次执行scheduleConnect;其close方法执行scheduleDisconnect方法,delay参数为0;scheduleDisconnect方法则调度执行disconnect方法;connect方法通过upstream的clientSessionFactory创建clientSession并执行其start方法,之后创建clientConsumer并设置其messageHandler(onMessage方法主要是执行server.getPostOffice().route(message, true)以及clientMessage.acknowledge());disconnect方法则执行clientConsumer及clientSession的close

ClientSessionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java

public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {

//......

public ClientSessionImpl start() throws ActiveMQException {

checkClosed();

if (!started) {

for (ClientConsumerInternal clientConsumerInternal : cloneConsumers()) {

clientConsumerInternal.start();

}

sessionContext.sessionStart();

started = true;

}

return this;

}

//......

}

  • ClientSessionImpl的start方法主要是执行clientConsumerInternal.start()及sessionContext.sessionStart()

ClientConsumerImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

//......

public synchronized void start() {

stopped = false;

requeueExecutors();

}

private void requeueExecutors() {

for (int i = 0; i < buffer.size(); i++) {

queueExecutor();

}

}

private void queueExecutor() {

if (logger.isTraceEnabled()) {

logger.trace(this + "::Adding Runner on Executor for delivery");

}

sessionExecutor.execute(runner);

}

private class Runner implements Runnable {

@Override

public void run() {

try {

callOnMessage();

} catch (Exception e) {

ActiveMQClientLogger.LOGGER.onMessageError(e);

lastException = e;

}

}

}

private void callOnMessage() throws Exception {

if (closing || stopped) {

return;

}

session.workDone();

// We pull the message from the buffer from inside the Runnable so we can ensure priority

// ordering. If we just added a Runnable with the message to the executor immediately as we get it

// we could not do that

ClientMessageInternal message;

// Must store handler in local variable since might get set to null

// otherwise while this is executing and give NPE when calling onMessage

MessageHandler theHandler = handler;

if (theHandler != null) {

if (rateLimiter != null) {

rateLimiter.limit();

}

failedOver = false;

synchronized (this) {

message = buffer.poll();

}

if (message != null) {

if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {

//Ignore, this could be a relic from a previous receiveImmediate();

return;

}

boolean expired = message.isExpired();

flowControlBeforeConsumption(message);

if (!expired) {

if (logger.isTraceEnabled()) {

logger.trace(this + "::Calling handler.onMessage");

}

final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {

@Override

public ClassLoader run() {

ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();

Thread.currentThread().setContextClassLoader(contextClassLoader);

return originalLoader;

}

});

onMessageThread = Thread.currentThread();

try {

theHandler.onMessage(message);

} finally {

try {

AccessController.doPrivileged(new PrivilegedAction<Object>() {

@Override

public Object run() {

Thread.currentThread().setContextClassLoader(originalLoader);

return null;

}

});

} catch (Exception e) {

ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);

}

onMessageThread = null;

}

if (logger.isTraceEnabled()) {

logger.trace(this + "::Handler.onMessage done");

}

if (message.isLargeMessage()) {

message.discardBody();

}

} else {

session.expire(this, message);

}

// If slow consumer, we need to send 1 credit to make sure we get another message

if (clientWindowSize == 0) {

startSlowConsumer();

}

}

}

}

//......

}

  • ClientConsumerImpl的start方法会调度执行Runner,其run方法则是执行callOnMessage方法,该方法会通过buffer.poll()拉取信息,然后执行theHandler.onMessage(message)回调

ActiveMQSessionContext

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java

public class ActiveMQSessionContext extends SessionContext {

//......

public void sessionStart() throws ActiveMQException {

sessionChannel.send(new PacketImpl(PacketImpl.SESS_START));

}

//......

}

  • ActiveMQSessionContext的sessionStart方法通过sessionChannel发送PacketImpl.SESS_START消息

小结

FederatedQueueConsumerImpl的start方法执行scheduleConnect方法,delay参数为0;scheduleConnect方法会使用scheduledExecutorService调度执行connect方法,在捕获到异常时计算新的delay再次执行scheduleConnect;其close方法执行scheduleDisconnect方法,delay参数为0;scheduleDisconnect方法则调度执行disconnect方法;connect方法通过upstream的clientSessionFactory创建clientSession并执行其start方法,之后创建clientConsumer并设置其messageHandler;disconnect方法则执行clientConsumer及clientSession的close

doc

  • FederatedQueue

以上是 聊聊artemis的FederatedQueue 的全部内容, 来源链接: utcz.com/z/513468.html

回到顶部