聊聊skywalking的SamplingService
SamplingService
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/sampling/SamplingService.java
@DefaultImplementorpublic class SamplingService implements BootService {
private static final ILog logger = LogManager.getLogger(SamplingService.class);
private volatile boolean on = false;
private volatile AtomicInteger samplingFactorHolder;
private volatile ScheduledFuture<?> scheduledFuture;
@Override
public void prepare() throws Throwable {
}
@Override
public void boot() throws Throwable {
if (scheduledFuture != null) {
/**
* If {@link #boot()} invokes twice, mostly in test cases,
* cancel the old one.
*/
scheduledFuture.cancel(true);
}
if (Config.Agent.SAMPLE_N_PER_3_SECS > 0) {
on = true;
this.resetSamplingFactor();
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("SamplingService"));
scheduledFuture = service.scheduleAtFixedRate(new RunnableWithExceptionProtection(new Runnable() {
@Override
public void run() {
resetSamplingFactor();
}
}, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, 3, TimeUnit.SECONDS);
logger.debug("Agent sampling mechanism started. Sample {} traces in 3 seconds.", Config.Agent.SAMPLE_N_PER_3_SECS);
}
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
}
/**
* @return true, if sampling mechanism is on, and getDefault the sampling factor successfully.
*/
public boolean trySampling() {
if (on) {
int factor = samplingFactorHolder.get();
if (factor < Config.Agent.SAMPLE_N_PER_3_SECS) {
boolean success = samplingFactorHolder.compareAndSet(factor, factor + 1);
return success;
} else {
return false;
}
}
return true;
}
/**
* Increase the sampling factor by force,
* to avoid sampling too many traces.
* If many distributed traces require sampled,
* the trace beginning at local, has less chance to be sampled.
*/
public void forceSampled() {
if (on) {
samplingFactorHolder.incrementAndGet();
}
}
private void resetSamplingFactor() {
samplingFactorHolder = new AtomicInteger(0);
}
}
- SamplingService实现了BootService接口,其boot方法在Config.Agent.SAMPLE_N_PER_3_SECS大于0时标记on为true,然后先执行一下resetSamplingFactor,之后注册一个定时任务定时执行resetSamplingFactor;其shutdown方法会cancel掉这个定时任务;resetSamplingFactor方法会重置samplingFactorHolder为0;它还提供了trySampling方法给外部调用,该方法在samplingFactorHolder小于Config.Agent.SAMPLE_N_PER_3_SECS时,执行samplingFactorHolder.compareAndSet(factor, factor + 1);forceSampled方法则在on的前提下执行samplingFactorHolder.incrementAndGet()
ContextManagerExtendService
skywalking-6.6.0/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
@DefaultImplementorpublic class ContextManagerExtendService implements BootService {
@Override public void prepare() {
}
@Override public void boot() {
}
@Override public void onComplete() {
}
@Override public void shutdown() {
}
public AbstractTracerContext createTraceContext(String operationName, boolean forceSampling) {
AbstractTracerContext context;
int suffixIdx = operationName.lastIndexOf(".");
if (suffixIdx > -1 && Config.Agent.IGNORE_SUFFIX.contains(operationName.substring(suffixIdx))) {
context = new IgnoredTracerContext();
} else {
SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling()) {
context = new TracingContext();
} else {
context = new IgnoredTracerContext();
}
}
return context;
}
}
- ContextManagerExtendService实现了BootService接口,不过都是空方法,它提供了createTraceContext方法,该方法根据配置决定返回IgnoredTracerContext还是TracingContext;在非Config.Agent.IGNORE_SUFFIX的条件下,在forceSampling或者samplingService.trySampling()返回true时,会返回TracingContext
小结
SamplingService在Config.Agent.SAMPLE_N_PER_3_SECS大于0时会注册一个定时任务定时重置samplingFactorHolder为0;它还提供了它还提供了trySampling方法给外部调用,该方法在samplingFactorHolder小于Config.Agent.SAMPLE_N_PER_3_SECS时,执行samplingFactorHolder.compareAndSet(factor, factor + 1);forceSampled方法则在on的前提下执行samplingFactorHolder.incrementAndGet();ContextManagerExtendService的createTraceContext会通过ServiceManager.INSTANCE.findService然后在forceSampling或者samplingService.trySampling()为true时返回TracingContext,其余的返回IgnoredTracerContext
doc
- SamplingService
- ContextManagerExtendService
以上是 聊聊skywalking的SamplingService 的全部内容, 来源链接: utcz.com/z/513940.html