聊聊carrera的BrokerMonitor

编程

本文主要研究一下carrera的BrokerMonitor

BrokerMonitor

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/BrokerMonitor.java

public class BrokerMonitor extends BaseConfigMonitor {

private final static Logger LOGGER = LoggerFactory.getLogger(BrokerMonitor.class);

private ExecutorService executor = ExecutorUtils.newFixedThreadPool(100, "BrokerMonitor", 200);

private BrokerMonitorItem monitorItem = null;

@Override

protected void initMonitor(String broker, BrokerConfig brokerConfig) throws Exception {

doMonitor(broker, brokerConfig);

}

public BrokerMonitor(MonitorConfig monitorConfig) {

super("Broker", monitorConfig);

}

private void doMonitor(String broker, BrokerConfig config) throws InterruptedException {

if (monitorItem != null) {

// stop first.

LOGGER.info("Stop old monitor broker: {}", broker);

monitorItem.stop();

}

BrokerMonitorItem item = new BrokerMonitorItem(broker, config);

try {

item.start();

} catch (Exception e) {

LOGGER.error("broker monitor start exception, broker=" + broker, e);

}

}

@Override

public void shutdown() {

ExecutorUtils.shutdown(executor);

monitorItem.stop();

super.shutdown();

}

//......

}

  • BrokerMonitor继承了BaseConfigMonitor,其initMonitor方法执行doMonitor,其shutdown会关闭executor,同时执行monitorItem.stop();doMonitor方法判断monitorItem不为null的话,先执行monitorItem.stop(),之后创建BrokerMonitorItem,执行其start方法

BrokerMonitorItem

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/BrokerMonitor.java

    class BrokerMonitorItem {

private String broker;

private BrokerConfig config;

private volatile boolean isRunning = false;

private ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

public BrokerMonitorItem(String broker, BrokerConfig config) {

this.broker = broker;

this.config = config;

}

public void start() {

isRunning = true;

scheduledExecutor.submit(() -> {

while (isRunning) {

monitorNamesvr();

monitorBroker();

try {

Thread.sleep(10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

LOGGER.info("broker<{}> [Active]", broker);

}

});

}

public void stop() {

isRunning = false;

ExecutorUtils.shutdown(scheduledExecutor);

}

private void monitorBroker() {

if (MapUtils.isEmpty(config.getBrokers()) || StringUtils.isBlank(config.getBrokerClusterAddrs())) {

return;

}

String nameSvr = config.getBrokerClusterAddrs().split(";")[0]; // use first namesvr.

for (Map.Entry<String, Set<String>> entry : config.getBrokers().entrySet()) {

String master = entry.getKey();

Set<String> slaves = entry.getValue();

executor.execute(() -> {

int j = 0;

for (; j < 2; ++j) {

try {

long masterOffset = Utils.checkReceive(broker, nameSvr, master);

if (masterOffset <= 0) {

continue;

}

Utils.checkSend(broker, nameSvr, master);

if (CollectionUtils.isNotEmpty(slaves)) {

for (String slave : slaves) {

long slaveOffset = Utils.checkReceive(broker, nameSvr, slave);

LOGGER.info("ReplicaDelayCheck broker={}, address={}->{}, masterOffset={}, slaveOffset={}, delayNum={}", broker, master.split(":")[0], slave.split(":")[0], masterOffset, slaveOffset, (masterOffset - slaveOffset));

if (slaveOffset <= 0) {

continue;

}

if (masterOffset - slaveOffset > 60) {

LOGGER.error(String.format("[AlarmReplicaDelayErr] broker=%s, address=%s->%s, delayNum=%s", broker, master.split(":")[0], slave.split(":")[0], (masterOffset - slaveOffset)));

}

}

}

break;

} catch (Throwable e) {

LOGGER.error("broker check broker exception, broker=" + broker, e);

}

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

}

if (j == 2) {

LOGGER.error(String.format("[AlarmCheckBrokerErr] broker=%s, namesvr=%s", broker, nameSvr));

}

});

}

}

private void monitorNamesvr() {

if (StringUtils.isBlank(config.getBrokerClusterAddrs())) {

LOGGER.info("broker:{}, brokerClusterAddrs is empty", config.getBrokerCluster());

return;

}

for (String nameSvr : config.getBrokerClusterAddrs().split(";")) {

executor.execute(() -> {

int j = 0;

for (; j < 2; ++j) {

try {

Utils.checkNameSvr(nameSvr, broker);

LOGGER.info(String.format("[NameSvrCheck] broker=%s, namesvr=%s", broker, nameSvr));

break;

} catch (Throwable e) {

LOGGER.error("broker checkNameSvr exception, broker=" + broker + ", namesvr=" + nameSvr, e);

}

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

LOGGER.error("broker checkNameSvr Thread.sleep exception, broker=" + broker, e);

}

}

if (j == 2) {

LOGGER.error(String.format("[AlarmNameSvrErr] broker=%s, namesvr=%s", broker, nameSvr));

}

});

}

}

}

  • BrokerMonitorItem的start方法会异步执行一个runnable,该runnable会不断执行monitorNamesvr、monitorBroker方法;monitorNamesvr方法会遍历nameSvr执行Utils.checkNameSvr(nameSvr, broker);monitorBroker方法会遍历config.getBrokers().entrySet(),挨个执行Utils.checkReceive(broker, nameSvr, master)以及Utils.checkSend(broker, nameSvr, master),对于slaves执行Utils.checkReceive(broker, nameSvr, slave)

Utils

DDMQ/carrera-monitor/src/main/java/com/xiaojukeji/carrera/monitor/broker/Utils.java

public class Utils {

private static final Logger logger = LoggerFactory.getLogger(Utils.class);

private static final Map<String, DefaultMQAdminExt> mqAdminExtMap = new ConcurrentHashMap<>();

private static final Map<String, DefaultMQPullConsumer> nameSvrCheckMap = new ConcurrentHashMap<>();

private static final Map<String, DefaultMQPullConsumer> brokerReceiveCheckMap = new ConcurrentHashMap<>();

private static final Map<String, DefaultMQProducer> brokerSendCheckMap = new ConcurrentHashMap<>();

//......

public static void checkNameSvr(String nameSvr, String cluster) throws MQClientException, InterruptedException {

getNameSvrCheckConsumer(nameSvr, cluster).getDefaultMQPullConsumerImpl().fetchPublishMessageQueues("SELF_TEST_TOPIC");

}

public static long checkReceive(String cluster, String nameSvr, String address)

throws MQClientException, NoSuchFieldException, SecurityException, IllegalArgumentException,

IllegalAccessException, InterruptedException, RemotingException, MQBrokerException {

DefaultMQPullConsumer consumer = getReceiveCheckConsumer(nameSvr, cluster, address);

Field f1 = DefaultMQPullConsumerImpl.class.getDeclaredField("mQClientFactory");

f1.setAccessible(true);

MQClientInstance instance = (MQClientInstance) f1.get(consumer.getDefaultMQPullConsumerImpl());

Field f = MQClientInstance.class.getDeclaredField("brokerAddrTable");

f.setAccessible(true);

Field f2 = MQClientInstance.class.getDeclaredField("scheduledExecutorService");

f2.setAccessible(true);

ScheduledExecutorService service = (ScheduledExecutorService) f2.get(instance);

service.shutdown();

service.awaitTermination(1000, TimeUnit.SECONDS);

ConcurrentHashMap<String, HashMap<Long, String>> map = (ConcurrentHashMap<String, HashMap<Long, String>>) f.get(instance);

HashMap<Long, String> addresses = new HashMap<>();

addresses.put(0L, address);

map.put("rmqmonitor_" + address, addresses);

MessageQueue queue = new MessageQueue("SELF_TEST_TOPIC", "rmqmonitor_" + address, 0);

boolean pullOk = false;

long maxOffset = -1;

for (int i = 0; i < 2; ++i) {

try {

maxOffset = consumer.getDefaultMQPullConsumerImpl().maxOffset(queue);

PullResult result = consumer.pull(queue, "*", maxOffset > 100 ? maxOffset - 10 : 0, 1);

if (result.getPullStatus() == PullStatus.FOUND) {

pullOk = true;

break;

} else if(result.getPullStatus() == PullStatus.NO_NEW_MSG) {

checkSend(cluster, nameSvr, address);

continue;

}

logger.warn("pull result failed, PullResult={}, cluster={}, namesvr={}, address={}", result, cluster, nameSvr, address);

} catch (Throwable e) {

logger.error("pull exception, cluster={}, namesvr={}, address={}", cluster, nameSvr, address, e);

}

Thread.sleep(1000);

}

if (!pullOk) {

logger.error(String.format("[AlarmPullErr] cluster=%s, broker=%s", cluster, address));

} else {

logger.info("AlarmPullCheck cluster={}, broker={}", cluster, address);

}

return maxOffset;

}

public static void checkSend(String cluster, String nameSvr, String address) throws MQClientException, NoSuchFieldException,

SecurityException, InterruptedException, IllegalArgumentException, IllegalAccessException, UnsupportedEncodingException, MQBrokerException, RemotingException {

if (!isBrokerTopicWritable(cluster, nameSvr, address)) {

return;

}

DefaultMQProducer producer = getSendCheckProducer(nameSvr, cluster, address);

MQClientInstance instance = producer.getDefaultMQProducerImpl().getmQClientFactory();

Field f = MQClientInstance.class.getDeclaredField("brokerAddrTable");

f.setAccessible(true);

Field f2 = MQClientInstance.class.getDeclaredField("scheduledExecutorService");

f2.setAccessible(true);

ScheduledExecutorService service = (ScheduledExecutorService) f2.get(instance);

service.shutdown();

service.awaitTermination(1000, TimeUnit.SECONDS);

ConcurrentHashMap<String, HashMap<Long, String>> map = (ConcurrentHashMap<String, HashMap<Long, String>>) f

.get(instance);

HashMap<Long, String> addresses = new HashMap<>();

addresses.put(0L, address);

map.put("rmqmonitor_" + address, addresses);

MessageQueue queue = new MessageQueue("SELF_TEST_TOPIC", "rmqmonitor_" + address, 0);

boolean sendOk = false;

SendResult sendResult = null;

for (int i = 0; i < 2; i++) {

try {

Message msg = new Message("SELF_TEST_TOPIC", // topic

"TagA", // tag

("Hello RocketMQ " + i).getBytes()// body

);

sendResult = producer.send(msg, queue);

if (sendResult.getSendStatus() == SendStatus.SEND_OK || sendResult.getSendStatus() == SLAVE_NOT_AVAILABLE) {

sendOk = true;

break;

}

logger.warn("send result failed, SendResult={}, cluster={}, namesvr={}, address={}", sendResult, cluster, nameSvr, address);

} catch (Exception e) {

logger.error("send exception, cluster={}, namesvr={}, address={}", cluster, nameSvr, address, e);

}

Thread.sleep(1000);

}

// 报警

if (!sendOk) {

logger.error(String.format("[AlarmSendErr] cluster=%s, broker=%s, result=%s", cluster, address, sendResult == null ? "null" : sendResult.toString()));

} else {

logger.info("AlarmSendCheck cluster={}, broker={}, result={}", cluster, address, sendResult.toString());

}

}

//......

}

  • checkNameSvr方法执行getNameSvrCheckConsumer(nameSvr,cluster).getDefaultMQPullConsumerImpl().fetchPublishMessageQueues方法;checkReceive方法主要是从SELF_TEST_TOPIC拉取数据;checkSend方法主要是给SELF_TEST_TOPIC发送数据

小结

BrokerMonitor继承了BaseConfigMonitor,其initMonitor方法执行doMonitor,其shutdown会关闭executor,同时执行monitorItem.stop();doMonitor方法判断monitorItem不为null的话,先执行monitorItem.stop(),之后创建BrokerMonitorItem,执行其start方法

doc

  • BrokerMonitor

以上是 聊聊carrera的BrokerMonitor 的全部内容, 来源链接: utcz.com/z/512600.html

回到顶部