聊聊artemis的SlowConsumerReaperRunnable

编程

SlowConsumerPolicy

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/SlowConsumerPolicy.java

public enum SlowConsumerPolicy {

KILL, NOTIFY;

public static SlowConsumerPolicy getType(int type) {

switch (type) {

case 0:

return KILL;

case 1:

return NOTIFY;

default:

return null;

}

}

}

  • SlowConsumerPolicy定义了KILL、NOTIFY两个枚举值

SlowConsumerReaperRunnable

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

   private final class SlowConsumerReaperRunnable implements Runnable {

private final SlowConsumerPolicy policy;

private final float threshold;

private final long checkPeriod;

private SlowConsumerReaperRunnable(long checkPeriod, float threshold, SlowConsumerPolicy policy) {

this.checkPeriod = checkPeriod;

this.policy = policy;

this.threshold = threshold;

}

@Override

public void run() {

float queueRate = getRate();

long queueMessages = getMessageCount();

if (logger.isDebugEnabled()) {

logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");

}

if (consumers.size() == 0) {

logger.debug("There are no consumers, no need to check slow consumer"s rate");

return;

} else {

float queueThreshold = threshold * consumers.size();

if (queueRate < queueThreshold && queueMessages < queueThreshold) {

if (logger.isDebugEnabled()) {

logger.debug("Insufficient messages received on queue "" + getName() + "" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");

}

return;

}

}

for (ConsumerHolder consumerHolder : consumers) {

Consumer consumer = consumerHolder.consumer();

if (consumer instanceof ServerConsumerImpl) {

ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;

float consumerRate = serverConsumer.getRate();

if (consumerRate < threshold) {

RemotingConnection connection = null;

ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();

RemotingService remotingService = server.getRemotingService();

for (RemotingConnection potentialConnection : remotingService.getConnections()) {

if (potentialConnection.getID().toString().equals(serverConsumer.getConnectionID())) {

connection = potentialConnection;

}

}

serverConsumer.fireSlowConsumer();

if (connection != null) {

ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);

if (policy.equals(SlowConsumerPolicy.KILL)) {

connection.killMessage(server.getNodeID());

remotingService.removeConnection(connection.getID());

connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));

} else if (policy.equals(SlowConsumerPolicy.NOTIFY)) {

TypedProperties props = new TypedProperties();

props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, getConsumerCount());

props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);

props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, SimpleString.toSimpleString(connection.getRemoteAddress()));

if (connection.getID() != null) {

props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, SimpleString.toSimpleString(connection.getID().toString()));

}

props.putLongProperty(ManagementHelper.HDR_CONSUMER_NAME, serverConsumer.getID());

props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, SimpleString.toSimpleString(serverConsumer.getSessionID()));

Notification notification = new Notification(null, CoreNotificationType.CONSUMER_SLOW, props);

ManagementService managementService = ((PostOfficeImpl) postOffice).getServer().getManagementService();

try {

managementService.sendNotification(notification);

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.failedToSendSlowConsumerNotification(notification, e);

}

}

}

}

}

}

}

}

  • SlowConsumerReaperRunnable实现了Runnable接口,其run方法会遍历consumers,对于ServerConsumerImply在其consumerRate小于threshold时执行serverConsumer.fireSlowConsumer();之后对于connection不为null的根据policy进行不同的处理,若为SlowConsumerPolicy.KILL则执行connection.killMessage(server.getNodeID())、connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())),若为SlowConsumerPolicy.NOTIFY则构建NotificationType为CoreNotificationType.CONSUMER_SLOW的notification执行managementService.sendNotification(notification)

fireSlowConsumer

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

//......

public void fireSlowConsumer() {

if (slowConsumerListener != null) {

slowConsumerListener.onSlowConsumer(this);

}

}

//......

}

  • fireSlowConsumer执行的是slowConsumerListener.onSlowConsumer(this)方法

SlowConsumerDetection

activemq-artemis-2.11.0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java

   class SlowConsumerDetection implements SlowConsumerDetectionListener {

@Override

public void onSlowConsumer(ServerConsumer consumer) {

if (consumer.getProtocolData() != null && consumer.getProtocolData() instanceof AMQConsumer) {

AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();

ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(amqConsumer.getOpenwireDestination());

ActiveMQMessage advisoryMessage = new ActiveMQMessage();

try {

advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, amqConsumer.getId().toString());

protocolManager.fireAdvisory(context, topic, advisoryMessage, amqConsumer.getId(), null);

} catch (Exception e) {

ActiveMQServerLogger.LOGGER.warn("Error during method invocation", e);

}

}

}

}

  • SlowConsumerDetection实现了SlowConsumerDetectionListener接口,其onSlowConsumer方法执行的是protocolManager.fireAdvisory方法

RemotingConnectionImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java

public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {

//......

public void killMessage(SimpleString nodeID) {

if (channelVersion < DisconnectConsumerWithKillMessage.VERSION_INTRODUCED) {

return;

}

Channel clientChannel = getChannel(1, -1);

DisconnectConsumerWithKillMessage response = new DisconnectConsumerWithKillMessage(nodeID);

clientChannel.send(response, -1);

}

public void fail(final ActiveMQException me, String scaleDownTargetNodeID) {

synchronized (failLock) {

if (destroyed) {

return;

}

destroyed = true;

}

if (!(me instanceof ActiveMQRemoteDisconnectException)) {

ActiveMQClientLogger.LOGGER.connectionFailureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType());

}

try {

transportConnection.forceClose();

} catch (Throwable e) {

ActiveMQClientLogger.LOGGER.failedForceClose(e);

}

// Then call the listeners

callFailureListeners(me, scaleDownTargetNodeID);

callClosingListeners();

internalClose();

for (Channel channel : channels.values()) {

channel.returnBlocking(me);

}

}

//......

}

  • killMessage方法构造DisconnectConsumerWithKillMessage并通过clientChannel.send方法;fail方法则执行transportConnection.forceClose()、callFailureListeners、callClosingListeners、internalClose以及channel.returnBlocking方法

sendNotification

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java

public class ManagementServiceImpl implements ManagementService {

//......

public void sendNotification(final Notification notification) throws Exception {

if (logger.isTraceEnabled()) {

logger.trace("Sending Notification = " + notification +

", notificationEnabled=" + notificationsEnabled +

" messagingServerControl=" + messagingServerControl);

}

// This needs to be synchronized since we need to ensure notifications are processed in strict sequence

synchronized (this) {

if (messagingServerControl != null && notificationsEnabled) {

// We also need to synchronize on the post office notification lock

// otherwise we can get notifications arriving in wrong order / missing

// if a notification occurs at same time as sendQueueInfoToQueue is processed

synchronized (postOffice.getNotificationLock()) {

// First send to any local listeners

for (NotificationListener listener : listeners) {

try {

listener.onNotification(notification);

} catch (Exception e) {

// Exception thrown from one listener should not stop execution of others

ActiveMQServerLogger.LOGGER.errorCallingNotifListener(e);

}

}

// start sending notification *messages* only when server has initialised

// Note at backup initialisation we don"t want to send notifications either

// https://jira.jboss.org/jira/browse/HORNETQ-317

if (messagingServer == null || !messagingServer.isActive()) {

if (logger.isDebugEnabled()) {

logger.debug("ignoring message " + notification + " as the server is not initialized");

}

return;

}

long messageID = storageManager.generateID();

Message notificationMessage = new CoreMessage(messageID, 512);

// Notification messages are always durable so the user can choose whether to add a durable queue to

// consume them in

notificationMessage.setDurable(true);

notificationMessage.setAddress(managementNotificationAddress);

if (notification.getProperties() != null) {

TypedProperties props = notification.getProperties();

props.forEach(notificationMessage::putObjectProperty);

}

notificationMessage.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));

notificationMessage.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());

if (notification.getUID() != null) {

notificationMessage.putStringProperty(new SimpleString("foobar"), new SimpleString(notification.getUID()));

}

postOffice.route(notificationMessage, false);

}

}

}

}

//......

}

  • sendNotification方法会回调listeners的onNotification方法,之后通过postOffice.route(notificationMessage, false)发送notificationMessage

小结

SlowConsumerReaperRunnable实现了Runnable接口,其run方法会遍历consumers,对于ServerConsumerImply在其consumerRate小于threshold时执行serverConsumer.fireSlowConsumer();之后对于connection不为null的根据policy进行不同的处理,若为SlowConsumerPolicy.KILL则执行connection.killMessage(server.getNodeID())、connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())),若为SlowConsumerPolicy.NOTIFY则构建NotificationType为CoreNotificationType.CONSUMER_SLOW的notification执行managementService.sendNotification(notification)

doc

  • QueueImpl

以上是 聊聊artemis的SlowConsumerReaperRunnable 的全部内容, 来源链接: utcz.com/z/512996.html

回到顶部