聊聊artemis的CriticalAnalyzerPolicy

编程

CriticalAnalyzerPolicy

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerPolicy.java

public enum CriticalAnalyzerPolicy {

HALT, SHUTDOWN, LOG;

static {

// for URI support on ClusterConnection

BeanSupport.registerConverter(new CriticalAnalyzerPolicyConverter(), CriticalAnalyzerPolicy.class);

}

static class CriticalAnalyzerPolicyConverter implements Converter {

@Override

public <T> T convert(Class<T> type, Object value) {

return type.cast(CriticalAnalyzerPolicy.valueOf(value.toString()));

}

}

}

  • CriticalAnalyzerPolicy定义了HALT, SHUTDOWN, LOG三个枚举值

initializeCriticalAnalyzer

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java

public class ActiveMQServerImpl implements ActiveMQServer {

//......

private void initializeCriticalAnalyzer() throws Exception {

// Some tests will play crazy frequenceistop/start

CriticalAnalyzer analyzer = this.getCriticalAnalyzer();

if (analyzer == null) {

if (configuration.isCriticalAnalyzer()) {

// this will have its own ScheduledPool

analyzer = new CriticalAnalyzerImpl();

} else {

analyzer = EmptyCriticalAnalyzer.getInstance();

}

this.analyzer = analyzer;

}

/* Calling this for cases where the server was stopped and now is being restarted... failback, etc...*/

analyzer.clear();

analyzer.setCheckTime(configuration.getCriticalAnalyzerCheckPeriod(), TimeUnit.MILLISECONDS).setTimeout(configuration.getCriticalAnalyzerTimeout(), TimeUnit.MILLISECONDS);

if (configuration.isCriticalAnalyzer()) {

analyzer.start();

}

CriticalAction criticalAction = null;

final CriticalAnalyzerPolicy criticalAnalyzerPolicy = configuration.getCriticalAnalyzerPolicy();

switch (criticalAnalyzerPolicy) {

case HALT:

criticalAction = criticalComponent -> {

ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent);

threadDump();

sendCriticalNotification(criticalComponent);

Runtime.getRuntime().halt(70); // Linux systems will have /usr/include/sysexits.h showing 70 as internal software error

};

break;

case SHUTDOWN:

criticalAction = criticalComponent -> {

ActiveMQServerLogger.LOGGER.criticalSystemShutdown(criticalComponent);

threadDump();

// on the case of a critical failure, -1 cannot simply means forever.

// in case graceful is -1, we will set it to 30 seconds

sendCriticalNotification(criticalComponent);

// you can"t stop from the check thread,

// nor can use an executor

Thread stopThread = new Thread() {

@Override

public void run() {

try {

ActiveMQServerImpl.this.stop();

} catch (Throwable e) {

logger.warn(e.getMessage(), e);

}

}

};

stopThread.start();

};

break;

case LOG:

criticalAction = criticalComponent -> {

ActiveMQServerLogger.LOGGER.criticalSystemLog(criticalComponent);

threadDump();

sendCriticalNotification(criticalComponent);

};

break;

}

analyzer.addAction(criticalAction);

}

//......

}

  • initializeCriticalAnalyzer方法先获取CriticalAnalyzer,若为null则创建一个,其中configuration.isCriticalAnalyzer()为true时创建的是CriticalAnalyzerImpl,否则创建的是EmptyCriticalAnalyzer.getInstance();然后执行clear、设置checkTime,之后根据不同的criticalAnalyzerPolicy创建不同的criticalAction添加到analyzer;不同的criticalAnalyzerPolicy均会执行ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent)、threadDump()以及sendCriticalNotification(criticalComponent),不同的是HALT还执行Runtime.getRuntime().halt(70),SHUTDOWN还执行ActiveMQServerImpl.this.stop(),而LOG没有额外其他操作

CriticalAnalyzer

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzer.java

public interface CriticalAnalyzer extends ActiveMQComponent {

default void clear() {

}

default int getNumberOfComponents() {

return 0;

}

boolean isMeasuring();

void add(CriticalComponent component);

void remove(CriticalComponent component);

CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit);

long getCheckTimeNanoSeconds();

CriticalAnalyzer setTimeout(long timeout, TimeUnit unit);

long getTimeout(TimeUnit unit);

long getTimeoutNanoSeconds();

CriticalAnalyzer addAction(CriticalAction action);

void check();

}

  • CriticalAnalyzer接口定义了setCheckTime、addAction、check等方法

EmptyCriticalAnalyzer

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/EmptyCriticalAnalyzer.java

public class EmptyCriticalAnalyzer implements CriticalAnalyzer {

private static final EmptyCriticalAnalyzer instance = new EmptyCriticalAnalyzer();

public static EmptyCriticalAnalyzer getInstance() {

return instance;

}

private EmptyCriticalAnalyzer() {

}

@Override

public void add(CriticalComponent component) {

}

@Override

public void remove(CriticalComponent component) {

}

@Override

public boolean isMeasuring() {

return false;

}

@Override

public void start() throws Exception {

}

@Override

public void stop() throws Exception {

}

@Override

public long getTimeoutNanoSeconds() {

return 0;

}

@Override

public boolean isStarted() {

return false;

}

@Override

public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {

return this;

}

@Override

public long getCheckTimeNanoSeconds() {

return 0;

}

@Override

public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {

return this;

}

@Override

public long getTimeout(TimeUnit unit) {

return 0;

}

@Override

public CriticalAnalyzer addAction(CriticalAction action) {

return this;

}

@Override

public void check() {

}

}

  • EmptyCriticalAnalyzer实现了CriticalAnalyzer接口,其方法都是空操作

CriticalAnalyzerImpl

activemq-artemis-2.11.0/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/critical/CriticalAnalyzerImpl.java

public class CriticalAnalyzerImpl implements CriticalAnalyzer {

private final Logger logger = Logger.getLogger(CriticalAnalyzer.class);

private volatile long timeoutNanoSeconds;

// one minute by default.. the server will change it for sure

private volatile long checkTimeNanoSeconds = TimeUnit.SECONDS.toNanos(60);

private final ActiveMQScheduledComponent scheduledComponent;

private final AtomicBoolean running = new AtomicBoolean(false);

public CriticalAnalyzerImpl() {

// this will make the scheduled component to start its own pool

/* Important: The scheduled component should have its own thread pool...

* otherwise in case of a deadlock, or a starvation of the server the analyzer won"t pick up any

* issues and won"t be able to shutdown the server or halt the VM

*/

this.scheduledComponent = new ActiveMQScheduledComponent(null, null, checkTimeNanoSeconds, TimeUnit.NANOSECONDS, false) {

@Override

public void run() {

logger.trace("Checking critical analyzer");

check();

}

};

}

@Override

public void clear() {

actions.clear();

components.clear();

}

private CopyOnWriteArrayList<CriticalAction> actions = new CopyOnWriteArrayList<>();

private final ConcurrentHashSet<CriticalComponent> components = new ConcurrentHashSet<>();

@Override

public int getNumberOfComponents() {

return components.size();

}

@Override

public boolean isMeasuring() {

return true;

}

@Override

public void add(CriticalComponent component) {

components.add(component);

}

@Override

public void remove(CriticalComponent component) {

components.remove(component);

}

@Override

public CriticalAnalyzer setCheckTime(long timeout, TimeUnit unit) {

this.checkTimeNanoSeconds = unit.toNanos(timeout);

this.scheduledComponent.setPeriod(timeout, unit);

return this;

}

@Override

public long getCheckTimeNanoSeconds() {

if (checkTimeNanoSeconds == 0) {

checkTimeNanoSeconds = getTimeout(TimeUnit.NANOSECONDS) / 2;

}

return checkTimeNanoSeconds;

}

@Override

public CriticalAnalyzer setTimeout(long timeout, TimeUnit unit) {

if (checkTimeNanoSeconds <= 0) {

this.setCheckTime(timeout / 2, unit);

}

this.timeoutNanoSeconds = unit.toNanos(timeout);

return this;

}

@Override

public long getTimeout(TimeUnit unit) {

if (timeoutNanoSeconds == 0) {

timeoutNanoSeconds = TimeUnit.MINUTES.toNanos(2);

}

return unit.convert(timeoutNanoSeconds, TimeUnit.NANOSECONDS);

}

@Override

public long getTimeoutNanoSeconds() {

return timeoutNanoSeconds;

}

@Override

public CriticalAnalyzer addAction(CriticalAction action) {

this.actions.add(action);

return this;

}

@Override

public void check() {

boolean retry = true;

while (retry) {

try {

for (CriticalComponent component : components) {

if (component.isExpired(timeoutNanoSeconds)) {

fireAction(component);

// no need to keep running if there"s already a component failed

return;

}

}

retry = false; // got to the end of the list, no need to retry

} catch (ConcurrentModificationException dontCare) {

// lets retry on the loop

}

}

}

private void fireAction(CriticalComponent component) {

for (CriticalAction action : actions) {

try {

action.run(component);

} catch (Throwable e) {

logger.warn(e.getMessage(), e);

}

}

actions.clear();

}

@Override

public void start() {

scheduledComponent.start();

}

@Override

public void stop() {

scheduledComponent.stop();

}

@Override

public boolean isStarted() {

return scheduledComponent.isStarted();

}

}

  • CriticalAnalyzerImpl的构造器会创建ActiveMQScheduledComponent调度执行check方法;clear方法会清空actions及components,setCheckTime方法会更新checkTimeNanoSeconds及scheduledComponent.setPeriod;check方法会挨个遍历components,判断component.isExpired(timeoutNanoSeconds),若为true则执行fireAction(component)并返回;fireAction方法则遍历actions,挨个执行action.run(component),最后清空actions

小结

CriticalAnalyzerPolicy定义了HALT, SHUTDOWN, LOG三个枚举值;ActiveMQServerImpl的initializeCriticalAnalyzer方法先获取CriticalAnalyzer,若为null则创建一个,其中configuration.isCriticalAnalyzer()为true时创建的是CriticalAnalyzerImpl,否则创建的是EmptyCriticalAnalyzer.getInstance();然后执行clear、设置checkTime,之后根据不同的criticalAnalyzerPolicy创建不同的criticalAction添加到analyzer;不同的criticalAnalyzerPolicy均会执行ActiveMQServerLogger.LOGGER.criticalSystemHalt(criticalComponent)、threadDump()以及sendCriticalNotification(criticalComponent),不同的是HALT还执行Runtime.getRuntime().halt(70),SHUTDOWN还执行ActiveMQServerImpl.this.stop(),而LOG没有额外其他操作

doc

  • ActiveMQServerImpl

以上是 聊聊artemis的CriticalAnalyzerPolicy 的全部内容, 来源链接: utcz.com/z/513207.html

回到顶部