聊聊skywalking的RemoteClientManager

编程

本文主要研究一下skywalking的RemoteClientManager

RemoteClientManager

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java

public class RemoteClientManager implements Service {

private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);

private final ModuleDefineHolder moduleDefineHolder;

private ClusterNodesQuery clusterNodesQuery;

private volatile List<RemoteClient> usingClients;

private GaugeMetrics gauge;

private int remoteTimeout;

/**

* Initial the manager for all remote communication clients.

*

* @param moduleDefineHolder for looking up other modules

* @param remoteTimeout for cluster internal communication, in second unit.

*/

public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {

this.moduleDefineHolder = moduleDefineHolder;

this.usingClients = ImmutableList.of();

this.remoteTimeout = remoteTimeout;

}

public void start() {

Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);

}

/**

* Query OAP server list from the cluster module and create a new connection for the new node. Make the OAP server

* orderly because of each of the server will send stream data to each other by hash code.

*/

void refresh() {

if (gauge == null) {

gauge = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)

.createGauge("cluster_size", "Cluster size of current oap node",

MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);

}

try {

if (Objects.isNull(clusterNodesQuery)) {

synchronized (RemoteClientManager.class) {

if (Objects.isNull(clusterNodesQuery)) {

this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);

}

}

}

if (logger.isDebugEnabled()) {

logger.debug("Refresh remote nodes collection.");

}

List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();

instanceList = distinct(instanceList);

Collections.sort(instanceList);

gauge.setValue(instanceList.size());

if (logger.isDebugEnabled()) {

instanceList.forEach(instance -> logger.debug("Cluster instance: {}", instance.toString()));

}

if (!compare(instanceList)) {

if (logger.isDebugEnabled()) {

logger.debug("ReBuilding remote clients.");

}

reBuildRemoteClients(instanceList);

}

printRemoteClientList();

} catch (Throwable t) {

logger.error(t.getMessage(), t);

}

}

//......

}

  • RemoteClientManager提供了getRemoteClient方法获取usingClients,它还提供了start方法,该方法注册一个定时任务每隔5秒执行一次refresh;refresh方法通过clusterNodesQuery.queryRemoteNodes()获取instanceList列表,然后根据Address去重一下再排序,然后跟本地的RemoteClient列表进行对比,如果有发现变更则触发reBuildRemoteClients操作,最后在执行printRemoteClientList

reBuildRemoteClients

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java

public class RemoteClientManager implements Service {

//......

private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {

final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()

.collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));

final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()

.collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));

final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());

unChangeAddresses.stream()

.filter(remoteClientCollection::containsKey)

.forEach(unChangeAddress -> remoteClientCollection.get(unChangeAddress).setAction(Action.Unchanged));

// make the latestRemoteClients including the new clients only

unChangeAddresses.forEach(latestRemoteClients::remove);

remoteClientCollection.putAll(latestRemoteClients);

final List<RemoteClient> newRemoteClients = new LinkedList<>();

remoteClientCollection.forEach((address, clientAction) -> {

switch (clientAction.getAction()) {

case Unchanged:

newRemoteClients.add(clientAction.getRemoteClient());

break;

case Create:

if (address.isSelf()) {

RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);

newRemoteClients.add(client);

} else {

RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);

client.connect();

newRemoteClients.add(client);

}

break;

}

});

//for stable ordering for rolling selector

Collections.sort(newRemoteClients);

this.usingClients = ImmutableList.copyOf(newRemoteClients);

remoteClientCollection.values()

.stream()

.filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))

.forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());

}

//......

}

  • reBuildRemoteClients方法先构建remoteClientCollection及latestRemoteClients,然后取交集得到unChangeAddresses,然后从latestRemoteClients移除unChangeAddresses,最后再把latestRemoteClients添加到remoteClientCollection;之后遍历remoteClientCollection,对于action为Create的区分为SelfRemoteClient及GRPCRemoteClient,对于GRPCRemoteClient的还执行一下connect操作;最后对newRemoteClients进行排序,然后重新赋值给usingClients;最后对于action为close的remoteClient执行close操作

RemoteSenderService

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java

public class RemoteSenderService implements Service {

private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class);

private final ModuleManager moduleManager;

private final HashCodeSelector hashCodeSelector;

private final ForeverFirstSelector foreverFirstSelector;

private final RollingSelector rollingSelector;

public RemoteSenderService(ModuleManager moduleManager) {

this.moduleManager = moduleManager;

this.hashCodeSelector = new HashCodeSelector();

this.foreverFirstSelector = new ForeverFirstSelector();

this.rollingSelector = new RollingSelector();

}

public void send(String nextWorkName, StreamData streamData, Selector selector) {

RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);

RemoteClient remoteClient = null;

List<RemoteClient> clientList = clientManager.getRemoteClient();

if (clientList.size() == 0) {

logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");

return;

}

switch (selector) {

case HashCode:

remoteClient = hashCodeSelector.select(clientList, streamData);

break;

case Rolling:

remoteClient = rollingSelector.select(clientList, streamData);

break;

case ForeverFirst:

remoteClient = foreverFirstSelector.select(clientList, streamData);

break;

}

remoteClient.push(nextWorkName, streamData);

}

}

  • RemoteSenderService提供了send方法,该方法从clientManager.getRemoteClient()获取clientList,然后根据selector类型从中选取一个remoteClient执行remoteClient.push(nextWorkName, streamData)

RemoteClientSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java

public interface RemoteClientSelector {

RemoteClient select(List<RemoteClient> clients, StreamData streamData);

}

  • RemoteClientSelector定义了select方法

HashCodeSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java

public class HashCodeSelector implements RemoteClientSelector {

@Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {

int size = clients.size();

int selectIndex = Math.abs(streamData.remoteHashCode()) % size;

return clients.get(selectIndex);

}

}

  • HashCodeSelector实现了RemoteClientSelector接口,它通过Math.abs(streamData.remoteHashCode()) % size来选择selectIndex

RollingSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java

public class RollingSelector implements RemoteClientSelector {

private int index = 0;

@Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {

int size = clients.size();

index++;

int selectIndex = Math.abs(index) % size;

if (index == Integer.MAX_VALUE) {

index = 0;

}

return clients.get(selectIndex);

}

}

  • RollingSelector实现了RemoteClientSelector接口,它通过每次递增index然后根据Math.abs(index) % size选择selectIndex

ForeverFirstSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java

public class ForeverFirstSelector implements RemoteClientSelector {

private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);

@Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {

if (logger.isDebugEnabled()) {

logger.debug("clients size: {}", clients.size());

}

return clients.get(0);

}

}

  • ForeverFirstSelector实现了RemoteClientSelector接口,它始终返回第一个client

小结

RemoteClientManager提供了getRemoteClient方法获取usingClients,它还提供了start方法,该方法注册一个定时任务每隔5秒执行一次refresh;refresh方法通过clusterNodesQuery.queryRemoteNodes()获取instanceList列表,然后根据Address去重一下再排序,然后跟本地的RemoteClient列表进行对比,如果有发现变更则触发reBuildRemoteClients操作,最后在执行printRemoteClientList

doc

  • RemoteClientManager

以上是 聊聊skywalking的RemoteClientManager 的全部内容, 来源链接: utcz.com/z/514914.html

回到顶部