聊聊artemis的ServerConnectionLifeCycleListener

编程

BaseConnectionLifeCycleListener

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BaseConnectionLifeCycleListener.java

public interface BaseConnectionLifeCycleListener<ProtocolClass> {

void connectionCreated(ActiveMQComponent component, Connection connection, ProtocolClass protocol);

void connectionDestroyed(Object connectionID);

void connectionException(Object connectionID, ActiveMQException me);

void connectionReadyForWrites(Object connectionID, boolean ready);

}

  • BaseConnectionLifeCycleListener接口定义了connectionCreated、connectionDestroyed、connectionException、connectionReadyForWrites方法

ServerConnectionLifeCycleListener

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ServerConnectionLifeCycleListener.java

public interface ServerConnectionLifeCycleListener extends BaseConnectionLifeCycleListener<ProtocolManager> {

}

  • ServerConnectionLifeCycleListener继承了BaseConnectionLifeCycleListener,其泛型为ProtocolManager

RemotingServiceImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java

public class RemotingServiceImpl implements RemotingService, ServerConnectionLifeCycleListener {

//......

@Override

public void connectionCreated(final ActiveMQComponent component,

final Connection connection,

final ProtocolManager protocol) {

if (server == null) {

throw new IllegalStateException("Unable to create connection, server hasn"t finished starting up");

}

ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);

try {

if (server.hasBrokerConnectionPlugins()) {

server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));

}

} catch (ActiveMQException t) {

logger.warn("Error executing afterCreateConnection plugin method: {}", t.getMessage(), t);

throw new IllegalStateException(t.getMessage(), t.getCause());

}

if (logger.isTraceEnabled()) {

logger.trace("Connection created " + connection);

}

connections.put(connection.getID(), entry);

connectionCountLatch.countUp();

totalConnectionCount.incrementAndGet();

}

@Override

public void connectionDestroyed(final Object connectionID) {

if (logger.isTraceEnabled()) {

logger.trace("Connection removed " + connectionID + " from server " + this.server, new Exception("trace"));

}

issueFailure(connectionID, new ActiveMQRemoteDisconnectException());

}

private void issueFailure(Object connectionID, ActiveMQException e) {

ConnectionEntry conn = connections.get(connectionID);

if (conn != null && !conn.connection.isSupportReconnect()) {

RemotingConnection removedConnection = removeConnection(connectionID);

if (removedConnection != null) {

try {

if (server.hasBrokerConnectionPlugins()) {

server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection));

}

} catch (ActiveMQException t) {

logger.warn("Error executing afterDestroyConnection plugin method: {}", t.getMessage(), t);

conn.connection.fail(t);

return;

}

}

conn.connection.fail(e);

}

}

@Override

public void connectionException(final Object connectionID, final ActiveMQException me) {

issueFailure(connectionID, me);

}

@Override

public void connectionReadyForWrites(final Object connectionID, final boolean ready) {

}

//......

}

  • RemotingServiceImpl实现了ServerConnectionLifeCycleListener接口,其connectionCreated在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));其connectionDestroyed方法主要执行issueFailure,在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection))

ActiveMQServerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

public class ActiveMQServerImpl implements ActiveMQServer {

//......

public void registerBrokerPlugin(final ActiveMQServerBasePlugin plugin) {

configuration.registerBrokerPlugin(plugin);

plugin.registered(this);

}

public void unRegisterBrokerPlugin(final ActiveMQServerBasePlugin plugin) {

configuration.unRegisterBrokerPlugin(plugin);

plugin.unregistered(this);

}

public boolean hasBrokerConnectionPlugins() {

return !getBrokerConnectionPlugins().isEmpty();

}

public List<ActiveMQServerConnectionPlugin> getBrokerConnectionPlugins() {

return configuration.getBrokerConnectionPlugins();

}

public void callBrokerConnectionPlugins(final ActiveMQPluginRunnable<ActiveMQServerConnectionPlugin> pluginRun) throws ActiveMQException {

callBrokerPlugins(getBrokerConnectionPlugins(), pluginRun);

}

private <P extends ActiveMQServerBasePlugin> void callBrokerPlugins(final List<P> plugins, final ActiveMQPluginRunnable<P> pluginRun) throws ActiveMQException {

if (pluginRun != null) {

for (P plugin : plugins) {

try {

pluginRun.run(plugin);

} catch (Throwable e) {

if (e instanceof ActiveMQException) {

logger.debug("plugin " + plugin + " is throwing ActiveMQException");

throw (ActiveMQException) e;

} else {

logger.warn("Internal error on plugin " + pluginRun, e.getMessage(), e);

}

}

}

}

}

//......

}

  • callBrokerPlugins方法会遍历plugins,然后挨个执行pluginRun.run(plugin)方法

小结

BaseConnectionLifeCycleListener接口定义了connectionCreated、connectionDestroyed、connectionException、connectionReadyForWrites方法;RemotingServiceImpl实现了ServerConnectionLifeCycleListener接口,其connectionCreated在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterCreateConnection(entry.connection));其connectionDestroyed方法主要执行issueFailure,在server.hasBrokerConnectionPlugins()为true的情况下会执行server.callBrokerConnectionPlugins(plugin -> plugin.afterDestroyConnection(removedConnection))

doc

  • ServerConnectionLifeCycleListener

以上是 聊聊artemis的ServerConnectionLifeCycleListener 的全部内容, 来源链接: utcz.com/z/512942.html

回到顶部