聊聊skywalking的SamplingService

编程

SamplingService

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingService.java

@DefaultImplementor

public class SamplingService implements BootService {

private static final ILog logger = LogManager.getLogger(SamplingService.class);

private volatile boolean on = false;

private volatile AtomicInteger samplingFactorHolder;

private volatile ScheduledFuture<?> scheduledFuture;

@Override

public void prepare() throws Throwable {

}

@Override

public void boot() throws Throwable {

if (scheduledFuture != null) {

/**

* If {@link #boot()} invokes twice, mostly in test cases,

* cancel the old one.

*/

scheduledFuture.cancel(true);

}

if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {

on = true;

this.resetSamplingFactor();

ScheduledExecutorService service = Executors

.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));

scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {

@Override

public void run() {

resetSamplingFactor();

}

}, new RunnableWithExceptionProtection.CallbackWhenException() {

@Override public void handle(Throwable t) {

logger.error("unexpected exception.", t);

}

}), 0, 3, TimeUnit.SECONDS);

logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);

}

}

@Override

public void onComplete() throws Throwable {

}

@Override

public void shutdown() throws Throwable {

if (scheduledFuture != null) {

scheduledFuture.cancel(true);

}

}

/**

* @return true, if sampling mechanism is on, and getDefault the sampling factor successfully.

*/

public boolean trySampling() {

if (on) {

int factor = samplingFactorHolder.get();

if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {

boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);

return success;

} else {

return false;

}

}

return true;

}

/**

* Increase the sampling factor by force,

* to avoid sampling too many traces.

* If many distributed traces require sampled,

* the trace beginning at local, has less chance to be sampled.

*/

public void forceSampled() {

if (on) {

samplingFactorHolder.incrementAndGet();

}

}

private void resetSamplingFactor() {

samplingFactorHolder = new AtomicInteger(0);

}

}

  • SamplingService实现了BootService接口,其boot方法在Config.Agent.SAMPLE_N_PER_3_SECS大于0时标记on为true,然后先执行一下resetSamplingFactor,之后注册一个定时任务定时执行resetSamplingFactor;其shutdown方法会cancel掉这个定时任务;resetSamplingFactor方法会重置samplingFactorHolder为0;它还提供了trySampling方法给外部调用,该方法在samplingFactorHolder小于Config.Agent.SAMPLE_N_PER_3_SECS时,执行samplingFactorHolder.compareAndSet(factor, factor + 1);forceSampled方法则在on的前提下执行samplingFactorHolder.incrementAndGet()

ContextManagerExtendService

skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java

@DefaultImplementor

public class ContextManagerExtendService implements BootService {

@Override public void prepare() {

}

@Override public void boot() {

}

@Override public void onComplete() {

}

@Override public void shutdown() {

}

public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {

AbstractTracerContext context;

int suffixIdx = operationName.lastIndexOf(".");

if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {

context = new IgnoredTracerContext();

} else {

SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);

if (forceSampling || samplingService.trySampling()) {

context = new TracingContext();

} else {

context = new IgnoredTracerContext();

}

}

return context;

}

}

  • ContextManagerExtendService实现了BootService接口,不过都是空方法,它提供了createTraceContext方法,该方法根据配置决定返回IgnoredTracerContext还是TracingContext;在非Config.Agent.IGNORE_SUFFIX的条件下,在forceSampling或者samplingService.trySampling()返回true时,会返回TracingContext

小结

SamplingService在Config.Agent.SAMPLE_N_PER_3_SECS大于0时会注册一个定时任务定时重置samplingFactorHolder为0;它还提供了它还提供了trySampling方法给外部调用,该方法在samplingFactorHolder小于Config.Agent.SAMPLE_N_PER_3_SECS时,执行samplingFactorHolder.compareAndSet(factor, factor + 1);forceSampled方法则在on的前提下执行samplingFactorHolder.incrementAndGet();ContextManagerExtendService的createTraceContext会通过ServiceManager.INSTANCE.findService然后在forceSampling或者samplingService.trySampling()为true时返回TracingContext,其余的返回IgnoredTracerContext

doc

  • SamplingService
  • ContextManagerExtendService

以上是 聊聊skywalking的SamplingService 的全部内容, 来源链接: utcz.com/z/513940.html

回到顶部