聊聊artemis的DiscoveryGroup

编程

DiscoveryGroup

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java

public final class DiscoveryGroup implements ActiveMQComponent {

private static final Logger logger = Logger.getLogger(DiscoveryGroup.class);

private final List<DiscoveryListener> listeners = new ArrayList<>();

private final String name;

private Thread thread;

private boolean received;

private final Object waitLock = new Object();

private final Map<String, DiscoveryEntry> connectors = new ConcurrentHashMap<>();

private final long timeout;

private volatile boolean started;

private final String nodeID;

private final Map<String, String> uniqueIDMap = new HashMap<>();

private final BroadcastEndpoint endpoint;

private final NotificationService notificationService;

/**

* This is the main constructor, intended to be used

*

* @param nodeID

* @param name

* @param timeout

* @param endpointFactory

* @param service

* @throws Exception

*/

public DiscoveryGroup(final String nodeID,

final String name,

final long timeout,

BroadcastEndpointFactory endpointFactory,

NotificationService service) throws Exception {

this.nodeID = nodeID;

this.name = name;

this.timeout = timeout;

this.endpoint = endpointFactory.createBroadcastEndpoint();

this.notificationService = service;

}

@Override

public synchronized void start() throws Exception {

if (started) {

return;

}

endpoint.openClient();

started = true;

thread = new Thread(new DiscoveryRunnable(), "activemq-discovery-group-thread-" + name);

thread.setDaemon(true);

thread.start();

if (notificationService != null) {

TypedProperties props = new TypedProperties();

props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));

Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STARTED, props);

notificationService.sendNotification(notification);

}

}

/**

* This will start the DiscoveryRunnable and run it directly.

* This is useful for a test process where we need this execution blocking a thread.

*/

public void internalRunning() throws Exception {

endpoint.openClient();

started = true;

DiscoveryRunnable runnable = new DiscoveryRunnable();

runnable.run();

}

@Override

public void stop() {

synchronized (this) {

if (!started) {

return;

}

started = false;

}

synchronized (waitLock) {

waitLock.notifyAll();

}

try {

endpoint.close(false);

} catch (Exception e1) {

ActiveMQClientLogger.LOGGER.errorStoppingDiscoveryBroadcastEndpoint(endpoint, e1);

}

try {

if (thread != null) {

thread.interrupt();

thread.join(10000);

if (thread.isAlive()) {

ActiveMQClientLogger.LOGGER.timedOutStoppingDiscovery();

}

}

} catch (InterruptedException e) {

throw new ActiveMQInterruptedException(e);

}

thread = null;

if (notificationService != null) {

TypedProperties props = new TypedProperties();

props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name));

Notification notification = new Notification(nodeID, CoreNotificationType.DISCOVERY_GROUP_STOPPED, props);

try {

notificationService.sendNotification(notification);

} catch (Exception e) {

ActiveMQClientLogger.LOGGER.errorSendingNotifOnDiscoveryStop(e);

}

}

}

@Override

public boolean isStarted() {

return started;

}

public String getName() {

return name;

}

public synchronized List<DiscoveryEntry> getDiscoveryEntries() {

return new ArrayList<>(connectors.values());

}

//......

}

  • DiscoveryGroup的构造器会使用endpointFactory.createBroadcastEndpoint()创建endpoint;start方法会执行endpoint.openClient(),创建并执行DiscoveryRunnable

DiscoveryRunnable

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java

   class DiscoveryRunnable implements Runnable {

@Override

public void run() {

byte[] data = null;

while (started) {

try {

try {

data = endpoint.receiveBroadcast();

if (data == null) {

if (started) {

ActiveMQClientLogger.LOGGER.unexpectedNullDataReceived();

}

break;

}

} catch (Exception e) {

if (!started) {

return;

} else {

ActiveMQClientLogger.LOGGER.errorReceivingPacketInDiscovery(e);

}

}

ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(data);

String originatingNodeID = buffer.readString();

String uniqueID = buffer.readString();

checkUniqueID(originatingNodeID, uniqueID);

if (nodeID.equals(originatingNodeID)) {

if (checkExpiration()) {

callListeners();

}

// Ignore traffic from own node

continue;

}

int size = buffer.readInt();

boolean changed = false;

DiscoveryEntry[] entriesRead = new DiscoveryEntry[size];

// Will first decode all the elements outside of any lock

for (int i = 0; i < size; i++) {

TransportConfiguration connector = new TransportConfiguration();

connector.decode(buffer);

entriesRead[i] = new DiscoveryEntry(originatingNodeID, connector, System.currentTimeMillis());

}

synchronized (DiscoveryGroup.this) {

for (DiscoveryEntry entry : entriesRead) {

if (connectors.put(originatingNodeID, entry) == null) {

changed = true;

}

}

changed = changed || checkExpiration();

}

//only call the listeners if we have changed

//also make sure that we aren"t stopping to avoid deadlock

if (changed && started) {

if (logger.isTraceEnabled()) {

logger.trace("Connectors changed on Discovery:");

for (DiscoveryEntry connector : connectors.values()) {

logger.trace(connector);

}

}

callListeners();

}

synchronized (waitLock) {

received = true;

waitLock.notifyAll();

}

} catch (Throwable e) {

ActiveMQClientLogger.LOGGER.failedToReceiveDatagramInDiscovery(e);

}

}

}

}

  • DiscoveryRunnable实现了Runnable接口,其run方法通过endpoint.receiveBroadcast()接收数据,之后解析为DiscoveryEntry更新到connectors中;在changed为true时会执行callListeners,执行DiscoveryListener.connectorsChanged回调

JGroupsBroadcastEndpoint

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java

public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {

private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class);

private final String channelName;

private boolean clientOpened;

private boolean broadcastOpened;

private JChannelWrapper channel;

private JGroupsReceiver receiver;

private JChannelManager manager;

public JGroupsBroadcastEndpoint(JChannelManager manager, String channelName) {

this.manager = manager;

this.channelName = channelName;

}

@Override

public void broadcast(final byte[] data) throws Exception {

if (logger.isTraceEnabled())

logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());

if (broadcastOpened) {

org.jgroups.Message msg = new org.jgroups.Message();

msg.setBuffer(data);

channel.send(msg);

}

}

@Override

public byte[] receiveBroadcast() throws Exception {

if (logger.isTraceEnabled())

logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());

if (clientOpened) {

return receiver.receiveBroadcast();

} else {

return null;

}

}

@Override

public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {

if (logger.isTraceEnabled())

logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());

if (clientOpened) {

return receiver.receiveBroadcast(time, unit);

} else {

return null;

}

}

@Override

public synchronized void openClient() throws Exception {

if (clientOpened) {

return;

}

internalOpen();

receiver = new JGroupsReceiver();

channel.addReceiver(receiver);

clientOpened = true;

}

@Override

public synchronized void openBroadcaster() throws Exception {

if (broadcastOpened)

return;

internalOpen();

broadcastOpened = true;

}

public abstract JChannel createChannel() throws Exception;

public JGroupsBroadcastEndpoint initChannel() throws Exception {

this.channel = manager.getJChannel(channelName, this);

return this;

}

protected void internalOpen() throws Exception {

channel.connect();

}

@Override

public synchronized void close(boolean isBroadcast) throws Exception {

if (isBroadcast) {

broadcastOpened = false;

} else {

channel.removeReceiver(receiver);

clientOpened = false;

}

internalCloseChannel(channel);

}

/**

* Closes the channel used in this JGroups Broadcast.

* Can be overridden by implementations that use an externally managed channel.

*

* @param channel

*/

protected synchronized void internalCloseChannel(JChannelWrapper channel) {

channel.close(true);

}

}

  • JGroupsBroadcastEndpoint是个抽象类,它声明实现了BroadcastEndpoint接口;其broadcast方法创建org.jgroups.Message然后使用JChannelWrapper发送消息;其receiveBroadcast方法使用JGroupsReceiver来receiveBroadcast;其openClient则创建JGroupsReceiver;internalOpen方法则是执行channel.connect()

BroadcastGroupImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BroadcastGroupImpl.java

public class BroadcastGroupImpl implements BroadcastGroup, Runnable {

//......

public synchronized void broadcastConnectors() throws Exception {

ActiveMQBuffer buff = ActiveMQBuffers.dynamicBuffer(4096);

buff.writeString(nodeManager.getNodeId().toString());

buff.writeString(uniqueID);

buff.writeInt(connectors.size());

for (TransportConfiguration tcConfig : connectors) {

tcConfig.encode(buff);

}

// Only send as many bytes as we need.

byte[] data = new byte[buff.readableBytes()];

buff.getBytes(buff.readerIndex(), data);

endpoint.broadcast(data);

}

public void run() {

if (!started) {

return;

}

try {

broadcastConnectors();

loggedBroadcastException = false;

} catch (Exception e) {

// only log the exception at ERROR level once, even if it fails multiple times in a row - HORNETQ-919

if (!loggedBroadcastException) {

ActiveMQServerLogger.LOGGER.errorBroadcastingConnectorConfigs(e);

loggedBroadcastException = true;

} else {

logger.debug("Failed to broadcast connector configs...again", e);

}

}

}

//......

}

  • BroadcastGroupImpl实现了BroadcastGroup及Runnable方法,其run方法执行broadcastConnectors;broadcastConnectors方法则遍历connectors将TransportConfiguration写入到buff中之后通过endpoint.broadcast(data)广播出去

小结

DiscoveryGroup的构造器会使用endpointFactory.createBroadcastEndpoint()创建endpoint;start方法会执行endpoint.openClient(),创建并执行DiscoveryRunnable;DiscoveryRunnable实现了Runnable接口,其run方法通过endpoint.receiveBroadcast()接收数据,之后解析为DiscoveryEntry更新到connectors中;在changed为true时会执行callListeners,执行DiscoveryListener.connectorsChanged回调;JGroupsBroadcastEndpoint是个抽象类,它声明实现了BroadcastEndpoint接口;其broadcast方法创建org.jgroups.Message然后使用JChannelWrapper发送消息;其receiveBroadcast方法使用JGroupsReceiver来receiveBroadcast;其openClient则创建JGroupsReceiver;internalOpen方法则是执行channel.connect()

doc

  • DiscoveryGroup

以上是 聊聊artemis的DiscoveryGroup 的全部内容, 来源链接: utcz.com/z/513331.html

回到顶部