聊聊skywalking的DatabaseSlowStatement

编程

DatabaseSlowStatement

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DatabaseSlowStatement.java

@ScopeDeclaration(id = DATABASE_SLOW_STATEMENT, name = "DatabaseSlowStatement")

public class DatabaseSlowStatement extends Source {

@Getter @Setter private String id;

@Getter @Setter private int databaseServiceId;

@Getter @Setter private String statement;

@Getter @Setter private long latency;

@Getter @Setter private String traceId;

@Override public int scope() {

return DefaultScopeDefine.DATABASE_SLOW_STATEMENT;

}

@Override public String getEntityId() {

return Const.EMPTY_STRING;

}

}

  • DatabaseSlowStatement继承了Source,它定义了id、databaseServiceId、statement、latency、traceId属性,其scope方法返回DefaultScopeDefine.DATABASE_SLOW_STATEMENT

MultiScopesSpanListener

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/endpoint/MultiScopesSpanListener.java

public class MultiScopesSpanListener implements EntrySpanListener, ExitSpanListener, GlobalTraceIdsListener {

private static final Logger logger = LoggerFactory.getLogger(MultiScopesSpanListener.class);

private final SourceReceiver sourceReceiver;

private final ServiceInstanceInventoryCache instanceInventoryCache;

private final ServiceInventoryCache serviceInventoryCache;

private final EndpointInventoryCache endpointInventoryCache;

private final List<SourceBuilder> entrySourceBuilders;

private final List<SourceBuilder> exitSourceBuilders;

private final List<DatabaseSlowStatement> slowDatabaseAccesses;

private final TraceServiceModuleConfig config;

private final NetworkAddressInventoryCache networkAddressInventoryCache;

private SpanDecorator entrySpanDecorator;

private long minuteTimeBucket;

private String traceId;

private MultiScopesSpanListener(ModuleManager moduleManager, TraceServiceModuleConfig config) {

this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);

this.entrySourceBuilders = new LinkedList<>();

this.exitSourceBuilders = new LinkedList<>();

this.slowDatabaseAccesses = new ArrayList<>(10);

this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);

this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);

this.endpointInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(EndpointInventoryCache.class);

this.networkAddressInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);

this.config = config;

this.traceId = null;

}

@Override public boolean containsPoint(Point point) {

return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.TraceIds.equals(point);

}

@Override

public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {

this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket();

if (spanDecorator.getRefsCount() > 0) {

for (int i = 0; i < spanDecorator.getRefsCount(); i++) {

ReferenceDecorator reference = spanDecorator.getRefs(i);

SourceBuilder sourceBuilder = new SourceBuilder();

if (reference.getParentEndpointId() == Const.INEXISTENCE_ENDPOINT_ID) {

sourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID);

} else {

sourceBuilder.setSourceEndpointId(reference.getParentEndpointId());

}

final int networkAddressId = reference.getNetworkAddressId();

final int serviceIdByPeerId = serviceInventoryCache.getServiceId(networkAddressId);

final String address = networkAddressInventoryCache.get(networkAddressId).getName();

if (spanDecorator.getSpanLayer().equals(SpanLayer.MQ) || config.getUninstrumentedGatewaysConfig().isAddressConfiguredAsGateway(address)) {

int instanceIdByPeerId = instanceInventoryCache.getServiceInstanceId(serviceIdByPeerId, networkAddressId);

sourceBuilder.setSourceServiceInstanceId(instanceIdByPeerId);

sourceBuilder.setSourceServiceId(serviceIdByPeerId);

} else {

sourceBuilder.setSourceServiceInstanceId(reference.getParentServiceInstanceId());

sourceBuilder.setSourceServiceId(instanceInventoryCache.get(reference.getParentServiceInstanceId()).getServiceId());

}

sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId());

sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getServiceInstanceId());

sourceBuilder.setDestServiceId(segmentCoreInfo.getServiceId());

sourceBuilder.setDetectPoint(DetectPoint.SERVER);

sourceBuilder.setComponentId(spanDecorator.getComponentId());

setPublicAttrs(sourceBuilder, spanDecorator);

entrySourceBuilders.add(sourceBuilder);

}

} else {

SourceBuilder sourceBuilder = new SourceBuilder();

sourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID);

sourceBuilder.setSourceServiceInstanceId(Const.USER_INSTANCE_ID);

sourceBuilder.setSourceServiceId(Const.USER_SERVICE_ID);

sourceBuilder.setDestEndpointId(spanDecorator.getOperationNameId());

sourceBuilder.setDestServiceInstanceId(segmentCoreInfo.getServiceInstanceId());

sourceBuilder.setDestServiceId(segmentCoreInfo.getServiceId());

sourceBuilder.setDetectPoint(DetectPoint.SERVER);

sourceBuilder.setComponentId(spanDecorator.getComponentId());

setPublicAttrs(sourceBuilder, spanDecorator);

entrySourceBuilders.add(sourceBuilder);

}

this.entrySpanDecorator = spanDecorator;

}

@Override public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {

if (this.minuteTimeBucket == 0) {

this.minuteTimeBucket = segmentCoreInfo.getMinuteTimeBucket();

}

SourceBuilder sourceBuilder = new SourceBuilder();

int peerId = spanDecorator.getPeerId();

if (peerId == 0) {

return;

}

int destServiceId = serviceInventoryCache.getServiceId(peerId);

int mappingServiceId = serviceInventoryCache.get(destServiceId).getMappingServiceId();

int destInstanceId = instanceInventoryCache.getServiceInstanceId(destServiceId, peerId);

int mappingServiceInstanceId = instanceInventoryCache.get(destInstanceId).getMappingServiceInstanceId();

sourceBuilder.setSourceServiceInstanceId(segmentCoreInfo.getServiceInstanceId());

sourceBuilder.setSourceServiceId(segmentCoreInfo.getServiceId());

if (Const.NONE == mappingServiceId) {

sourceBuilder.setDestServiceId(destServiceId);

} else {

sourceBuilder.setDestServiceId(mappingServiceId);

}

if (Const.NONE == mappingServiceInstanceId) {

sourceBuilder.setDestServiceInstanceId(destInstanceId);

} else {

sourceBuilder.setDestServiceInstanceId(mappingServiceInstanceId);

}

sourceBuilder.setDetectPoint(DetectPoint.CLIENT);

sourceBuilder.setComponentId(spanDecorator.getComponentId());

setPublicAttrs(sourceBuilder, spanDecorator);

exitSourceBuilders.add(sourceBuilder);

if (sourceBuilder.getType().equals(RequestType.DATABASE)) {

boolean isSlowDBAccess = false;

DatabaseSlowStatement statement = new DatabaseSlowStatement();

statement.setId(segmentCoreInfo.getSegmentId() + "-" + spanDecorator.getSpanId());

statement.setDatabaseServiceId(sourceBuilder.getDestServiceId());

statement.setLatency(sourceBuilder.getLatency());

statement.setTimeBucket(TimeBucket.getRecordTimeBucket(segmentCoreInfo.getStartTime()));

statement.setTraceId(traceId);

for (KeyStringValuePair tag : spanDecorator.getAllTags()) {

if (SpanTags.DB_STATEMENT.equals(tag.getKey())) {

String sqlStatement = tag.getValue();

if (StringUtil.isEmpty(sqlStatement)) {

statement.setStatement("[No statement]/" + sourceBuilder.getDestEndpointName());

} else if (sqlStatement.length() > config.getMaxSlowSQLLength()) {

statement.setStatement(sqlStatement.substring(0, config.getMaxSlowSQLLength()));

} else {

statement.setStatement(sqlStatement);

}

} else if (SpanTags.DB_TYPE.equals(tag.getKey())) {

String dbType = tag.getValue();

DBLatencyThresholdsAndWatcher thresholds = config.getDbLatencyThresholdsAndWatcher();

int threshold = thresholds.getThreshold(dbType);

if (sourceBuilder.getLatency() > threshold) {

isSlowDBAccess = true;

}

}

}

if (isSlowDBAccess) {

slowDatabaseAccesses.add(statement);

}

}

}

//......

@Override public void build() {

entrySourceBuilders.forEach(entrySourceBuilder -> {

entrySourceBuilder.setTimeBucket(minuteTimeBucket);

sourceReceiver.receive(entrySourceBuilder.toAll());

sourceReceiver.receive(entrySourceBuilder.toService());

sourceReceiver.receive(entrySourceBuilder.toServiceInstance());

sourceReceiver.receive(entrySourceBuilder.toEndpoint());

sourceReceiver.receive(entrySourceBuilder.toServiceRelation());

sourceReceiver.receive(entrySourceBuilder.toServiceInstanceRelation());

EndpointRelation endpointRelation = entrySourceBuilder.toEndpointRelation();

/**

* Parent endpoint could be none, because in SkyWalking Cross Process Propagation Headers Protocol v2,

* endpoint in ref could be empty, based on that, endpoint relation maybe can"t be established.

* So, I am making this source as optional.

*

* Also, since 6.6.0, source endpoint could be none, if this trace begins by an internal task(local span or exit span), such as Timer,

* rather than, normally begin as an entry span, like a RPC server side.

*/

if (endpointRelation != null) {

sourceReceiver.receive(endpointRelation);

}

});

exitSourceBuilders.forEach(exitSourceBuilder -> {

if (nonNull(entrySpanDecorator)) {

exitSourceBuilder.setSourceEndpointId(entrySpanDecorator.getOperationNameId());

} else {

exitSourceBuilder.setSourceEndpointId(Const.USER_ENDPOINT_ID);

}

exitSourceBuilder.setSourceEndpointName(endpointInventoryCache.get(exitSourceBuilder.getSourceEndpointId()).getName());

exitSourceBuilder.setTimeBucket(minuteTimeBucket);

sourceReceiver.receive(exitSourceBuilder.toServiceRelation());

sourceReceiver.receive(exitSourceBuilder.toServiceInstanceRelation());

if (RequestType.DATABASE.equals(exitSourceBuilder.getType())) {

sourceReceiver.receive(exitSourceBuilder.toDatabaseAccess());

}

});

slowDatabaseAccesses.forEach(sourceReceiver::receive);

}

//......

}

  • MultiScopesSpanListener实现了EntrySpanListener、ExitSpanListener、GlobalTraceIdsListener接口,其parseExit方法在sourceBuilder.getType()为RequestType.DATABASE的时候会创建DatabaseSlowStatement,在tag.getKey()为SpanTags.DB_TYPE时,通过config.getDbLatencyThresholdsAndWatcher()获取DBLatencyThresholdsAndWatcher,然后在latency大于threshold时更新isSlowDBAccess为true,最后将DatabaseSlowStatement添加到slowDatabaseAccesses中;其build方法通过slowDatabaseAccesses.forEach(sourceReceiver::receive)通知sourceReceiver

SourceReceiverImpl

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java

public class SourceReceiverImpl implements SourceReceiver {

@Getter

private final DispatcherManager dispatcherManager;

public SourceReceiverImpl() {

this.dispatcherManager = new DispatcherManager();

}

@Override public void receive(Source source) {

dispatcherManager.forward(source);

}

public void scan() throws IOException, InstantiationException, IllegalAccessException {

dispatcherManager.scan();

}

}

  • SourceReceiverImpl实现了SourceReceiver接口,其receive方法执行dispatcherManager.forward(source)

小结

DatabaseSlowStatement继承了Source,它定义了id、databaseServiceId、statement、latency、traceId属性,其scope方法返回DefaultScopeDefine.DATABASE_SLOW_STATEMENT;MultiScopesSpanListener实现了EntrySpanListener、ExitSpanListener、GlobalTraceIdsListener接口,其parseExit方法在sourceBuilder.getType()为RequestType.DATABASE的时候会创建DatabaseSlowStatement,在tag.getKey()为SpanTags.DB_TYPE时,通过config.getDbLatencyThresholdsAndWatcher()获取DBLatencyThresholdsAndWatcher,然后在latency大于threshold时更新isSlowDBAccess为true,最后将DatabaseSlowStatement添加到slowDatabaseAccesses中;其build方法通过slowDatabaseAccesses.forEach(sourceReceiver::receive)通知sourceReceiver

doc

  • DatabaseSlowStatement

以上是 聊聊skywalking的DatabaseSlowStatement 的全部内容, 来源链接: utcz.com/z/515008.html

回到顶部