聊聊skywalking的TopNDatabaseStatement

编程

TopNDatabaseStatement

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/TopNDatabaseStatement.java

@Stream(name = TopNDatabaseStatement.INDEX_NAME, scopeId = DefaultScopeDefine.DATABASE_SLOW_STATEMENT, builder = TopNDatabaseStatement.Builder.class, processor = TopNStreamProcessor.class)

public class TopNDatabaseStatement extends TopN {

public static final String INDEX_NAME = "top_n_database_statement";

@Setter private String id;

@Override public String id() {

return id;

}

@Override public boolean equals(Object o) {

if (this == o)

return true;

if (o == null || getClass() != o.getClass())

return false;

TopNDatabaseStatement statement = (TopNDatabaseStatement)o;

return getServiceId() == statement.getServiceId();

}

@Override public int hashCode() {

return Objects.hash(getServiceId());

}

public static class Builder implements StorageBuilder<TopNDatabaseStatement> {

@Override public TopNDatabaseStatement map2Data(Map<String, Object> dbMap) {

TopNDatabaseStatement statement = new TopNDatabaseStatement();

statement.setStatement((String)dbMap.get(STATEMENT));

statement.setTraceId((String)dbMap.get(TRACE_ID));

statement.setLatency(((Number)dbMap.get(LATENCY)).longValue());

statement.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());

statement.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());

return statement;

}

@Override public Map<String, Object> data2Map(TopNDatabaseStatement storageData) {

Map<String, Object> map = new HashMap<>();

map.put(STATEMENT, storageData.getStatement());

map.put(TRACE_ID, storageData.getTraceId());

map.put(LATENCY, storageData.getLatency());

map.put(SERVICE_ID, storageData.getServiceId());

map.put(TIME_BUCKET, storageData.getTimeBucket());

return map;

}

}

}

  • TopNDatabaseStatement继承了TopN,定义了Builder

TopN

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/topn/TopN.java

public abstract class TopN extends Record implements ComparableStorageData {

public static final String STATEMENT = "statement";

public static final String LATENCY = "latency";

public static final String TRACE_ID = "trace_id";

public static final String SERVICE_ID = "service_id";

@Getter @Setter @Column(columnName = STATEMENT, content = true) private String statement;

@Getter @Setter @Column(columnName = LATENCY) private long latency;

@Getter @Setter @Column(columnName = TRACE_ID) private String traceId;

@Getter @Setter @Column(columnName = SERVICE_ID) private int serviceId;

@Override public int compareTo(Object o) {

TopN target = (TopN)o;

return (int)(latency - target.latency);

}

}

  • TopN定义了statement、latency、trace_id、service_id属性

DatabaseStatementDispatcher

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/database/DatabaseStatementDispatcher.java

public class DatabaseStatementDispatcher implements SourceDispatcher<DatabaseSlowStatement> {

@Override public void dispatch(DatabaseSlowStatement source) {

TopNDatabaseStatement statement = new TopNDatabaseStatement();

statement.setId(source.getId());

statement.setServiceId(source.getDatabaseServiceId());

statement.setLatency(source.getLatency());

statement.setStatement(source.getStatement());

statement.setTimeBucket(source.getTimeBucket());

statement.setTraceId(source.getTraceId());

TopNStreamProcessor.getInstance().in(statement);

}

}

  • DatabaseStatementDispatcher实现了SourceDispatcher接口,其dispatch方法将DatabaseSlowStatement转换为TopNDatabaseStatement,然后执行TopNStreamProcessor.getInstance().in(statement)

TopNStreamProcessor

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/TopNStreamProcessor.java

public class TopNStreamProcessor implements StreamProcessor<TopN> {

private static final TopNStreamProcessor PROCESSOR = new TopNStreamProcessor();

@Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();

private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();

@Setter @Getter private int topNWorkerReportCycle = 10;

@Setter @Getter private int topSize = 50;

public static TopNStreamProcessor getInstance() {

return PROCESSOR;

}

@SuppressWarnings("unchecked")

public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends TopN> topNClass) {

if (DisableRegister.INSTANCE.include(stream.name())) {

return;

}

StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);

IRecordDAO recordDAO;

try {

recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());

} catch (InstantiationException | IllegalAccessException e) {

throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " top n record DAO failure.", e);

}

IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);

Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);

TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO);

persistentWorkers.add(persistentWorker);

workers.put(topNClass, persistentWorker);

}

public void in(TopN topN) {

TopNWorker worker = workers.get(topN.getClass());

if (worker != null) {

worker.in(topN);

}

}

}

  • TopNStreamProcessor实现了StreamProcessor接口,其in方法从workers中获取TopNWorker,执行worker.in(topN)

小结

TopNDatabaseStatement继承了TopN,定义了Builder;DatabaseStatementDispatcher实现了SourceDispatcher接口,其dispatch方法将DatabaseSlowStatement转换为TopNDatabaseStatement,然后执行TopNStreamProcessor.getInstance().in(statement)

doc

  • TopNDatabaseStatement

以上是 聊聊skywalking的TopNDatabaseStatement 的全部内容, 来源链接: utcz.com/z/515041.html

回到顶部