聊聊skywalking的HTTPAccessLog

编程

HTTPAccessLog

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/HTTPAccessLog.java

@ScopeDeclaration(id = HTTP_ACCESS_LOG, name = "HTTPAccessLog")

public class HTTPAccessLog extends AbstractLog {

@Override public int scope() {

return HTTP_ACCESS_LOG;

}

}

  • HTTPAccessLog继承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG

AbstractLog

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/AbstractLog.java

@Setter

@Getter

public abstract class AbstractLog extends Source {

private long timeBucket;

private long timestamp;

private int serviceId;

private int serviceInstanceId;

private int endpointId;

private String traceId;

private int isError;

private String statusCode;

private ContentType contentType = ContentType.NONE;

private String content;

@Override public String getEntityId() {

throw new UnexpectedException("getEntityId is not supported in AbstractLog source");

}

}

  • AbstractLog继承了Source,它定义了timeBucket、timestamp、serviceId、serviceInstanceId、endpointId、traceId、isError、statusCode、contentType、content属性

HTTPAccessLogDispatcher

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/log/HTTPAccessLogDispatcher.java

public class HTTPAccessLogDispatcher implements SourceDispatcher<HTTPAccessLog> {

@Override public void dispatch(HTTPAccessLog source) {

HTTPAccessLogRecord record = new HTTPAccessLogRecord();

record.setTimestamp(source.getTimestamp());

record.setTimeBucket(source.getTimeBucket());

record.setServiceId(source.getServiceId());

record.setServiceInstanceId(source.getServiceInstanceId());

record.setEndpointId(source.getEndpointId());

record.setTraceId(source.getTraceId());

record.setIsError(source.getIsError());

record.setStatusCode(source.getStatusCode());

record.setContentType(source.getContentType().value());

record.setContent(source.getContent());

RecordStreamProcessor.getInstance().in(record);

}

}

  • HTTPAccessLogDispatcher实现了SourceDispatcher接口,其dispatch将HTTPAccessLog转换为HTTPAccessLogRecord,然后执行RecordStreamProcessor.getInstance().in(record)

RecordStreamProcessor

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordStreamProcessor.java

public class RecordStreamProcessor implements StreamProcessor<Record> {

private final static RecordStreamProcessor PROCESSOR = new RecordStreamProcessor();

private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();

public static RecordStreamProcessor getInstance() {

return PROCESSOR;

}

public void in(Record record) {

RecordPersistentWorker worker = workers.get(record.getClass());

if (worker != null) {

worker.in(record);

}

}

@SuppressWarnings("unchecked")

public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) {

if (DisableRegister.INSTANCE.include(stream.name())) {

return;

}

StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);

IRecordDAO recordDAO;

try {

recordDAO = storageDAO.newRecordDao(stream.builder().newInstance());

} catch (InstantiationException | IllegalAccessException e) {

throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);

}

IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);

Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);

RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);

workers.put(recordClass, persistentWorker);

}

}

  • RecordStreamProcessor实现了StreamProcessor接口,其in方法从workers中找出record.getClass()对应的RecordPersistentWorker,然后执行其in方法

RecordPersistentWorker

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java

public class RecordPersistentWorker extends AbstractWorker<Record> {

private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);

private final Model model;

private final IRecordDAO recordDAO;

private final IBatchDAO batchDAO;

RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {

super(moduleDefineHolder);

this.model = model;

this.recordDAO = recordDAO;

this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);

}

@Override public void in(Record record) {

try {

InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);

batchDAO.asynchronous(insertRequest);

} catch (IOException e) {

logger.error(e.getMessage(), e);

}

}

}

  • RecordPersistentWorker继承了AbstractWorker,其in方法执行recordDAO.prepareBatchInsert(model, record),然后用返回的insertRequest执行batchDAO.asynchronous(insertRequest)

小结

HTTPAccessLog继承了AbstractLog,其scope方法返回的是HTTP_ACCESS_LOG;HTTPAccessLogDispatcher实现了SourceDispatcher接口,其dispatch将HTTPAccessLog转换为HTTPAccessLogRecord,然后执行RecordStreamProcessor.getInstance().in(record)

doc

  • HTTPAccessLog

以上是 聊聊skywalking的HTTPAccessLog 的全部内容, 来源链接: utcz.com/z/514968.html

回到顶部