【Java】hystrix - Fallback是怎么调用的

hystrix - Fallback是怎么调用的

大军发布于 今天 15:18

调用Fallback有以下几种情况:

  • 熔断器开启
  • 信号量拒绝
  • 线程池拒绝
  • 执行方法失败

hystrix - @EnableCircuitBreaker那些事我们知道最终会调用HystrixCommand的execute方法,他这个方法就会调用queue方法。

public R execute() {

try {

return queue().get();

} catch (Exception e) {

throw Exceptions.sneakyThrow(decomposeException(e));

}

}

queue方法如下,这里是拿到一个Future对象。重点还是toObservable方法。

 public Future<R> queue() {

// 其他略

final Future<R> delegate = toObservable().toBlocking().toFuture();

// 其他略

}

这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以后面就会调用applyHystrixSemantics方法。

public Observable<R> toObservable() {

//其他略

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {

return Observable.never();

}

return applyHystrixSemantics(_cmd);

}

};

//其他略

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

//其他略

final boolean requestCacheEnabled = isRequestCachingEnabled();

final String cacheKey = getCacheKey();

/* try from cache first */

if (requestCacheEnabled) {

HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);

if (fromCache != null) {

isResponseFromCache = true;

return handleRequestCacheHitAndEmitValues(fromCache, _cmd);

}

}

Observable<R> hystrixObservable =

Observable.defer(applyHystrixSemantics)

.map(wrapWithAllOnNextHooks);

Observable<R> afterCache;

// put in cache

if (requestCacheEnabled && cacheKey != null) {

// wrap it for caching

HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);

HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);

if (fromCache != null) {

// another thread beat us so we'll use the cached value instead

toCache.unsubscribe();

isResponseFromCache = true;

return handleRequestCacheHitAndEmitValues(fromCache, _cmd);

} else {

// we just created an ObservableCommand so we cast and return it

afterCache = toCache.toObservable();

}

} else {

afterCache = hystrixObservable;

}

return afterCache

.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))

.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once

.doOnCompleted(fireOnCompletedHook);

}

});

}

熔断判断

这里是比较重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback方法,如果没有开启熔断,他还会去判断是否能请求到信号量,请求不到就调用handleSemaphoreRejectionViaFallback方法。如果都正常,就调用executeCommandAndObserve方法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

//其他略

if (circuitBreaker.allowRequest()) {

//其他略

if (executionSemaphore.tryAcquire()) {

try {

/* used to track userThreadExecutionTime */

executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

return executeCommandAndObserve(_cmd)

.doOnError(markExceptionThrown)

.doOnTerminate(singleSemaphoreRelease)

.doOnUnsubscribe(singleSemaphoreRelease);

} catch (RuntimeException e) {

return Observable.error(e);

}

} else {

return handleSemaphoreRejectionViaFallback();

}

} else {

//

return handleShortCircuitViaFallback();

}

}

熔断开启

我们先看看熔断开启后的调用,会通过getFallbackObservable方法获取fallbackExecutionChain,getFallbackObservable主要的作用就是调用getFallback方法。

private Observable<R> handleShortCircuitViaFallback() {

// 其他略

return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,

"short-circuited", shortCircuitException);

// 其他略

}

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {

// 其他略

fallbackExecutionChain = getFallbackObservable();

// 其他略

return fallbackExecutionChain

.doOnEach(setRequestContext)

.lift(new FallbackHookApplication(_cmd))

.lift(new DeprecatedOnFallbackHookApplication(_cmd))

.doOnNext(markFallbackEmit)

.doOnCompleted(markFallbackCompleted)

.onErrorResumeNext(handleFallbackError)

.doOnTerminate(singleSemaphoreRelease)

.doOnUnsubscribe(singleSemaphoreRelease);

}

// 其他略

}

@Override

final protected Observable<R> getFallbackObservable() {

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

try {

return Observable.just(getFallback());

} catch (Throwable ex) {

return Observable.error(ex);

}

}

});

}

getFallback是调用fallback方法的入口,MethodExecutionAction会通过发射,调用我们配置的方法。

@Override

protected Object getFallback() {

final CommandAction commandAction = getFallbackAction();

if (commandAction != null) {

try {

return process(new Action() {

@Override

Object execute() {

MetaHolder metaHolder = commandAction.getMetaHolder();

Object[] args = createArgsForFallback(metaHolder, getExecutionException());

return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);

}

});

} catch (Throwable e) {

LOGGER.error(FallbackErrorMessageBuilder.create()

.append(commandAction, e).build());

throw new FallbackInvocationException(unwrapCause(e));

}

} else {

return super.getFallback();

}

}

信号量隔离

我们看了上面熔断开启的代码,这边就比较简单,就是跟上面一样,调用getFallbackOrThrowException方法。

private Observable<R> handleSemaphoreRejectionViaFallback() {

// 其他略

return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,

"could not acquire a semaphore for execution", semaphoreRejectionException);

}

线程池隔离

如果熔断没开,信号量又可以获取到,他就会调用executeCommandAndObserve方法。这个方法,handleFallback定义了几个异常,比如线程池异常处理,时间超时处理,请求异常处理,以及其他异常处理。然后调用executeCommandWithSpecifiedIsolation方法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

//其他略

final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {

@Override

public Observable<R> call(Throwable t) {

Exception e = getExceptionFromThrowable(t);

executionResult = executionResult.setExecutionException(e);

if (e instanceof RejectedExecutionException) {

return handleThreadPoolRejectionViaFallback(e);

} else if (t instanceof HystrixTimeoutException) {

return handleTimeoutViaFallback();

} else if (t instanceof HystrixBadRequestException) {

return handleBadRequestByEmittingError(e);

} else {

/*

* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.

*/

if (e instanceof HystrixBadRequestException) {

eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);

return Observable.error(e);

}

return handleFailureViaFallback(e);

}

}

};

//其他略

Observable<R> execution;

if (properties.executionTimeoutEnabled().get()) {

execution = executeCommandWithSpecifiedIsolation(_cmd)

.lift(new HystrixObservableTimeoutOperator<R>(_cmd));

} else {

execution = executeCommandWithSpecifiedIsolation(_cmd);

}

return execution.doOnNext(markEmits)

.doOnCompleted(markOnCompleted)

.onErrorResumeNext(handleFallback)

.doOnEach(setRequestContext);

}

这个方法主要是两件事情,一个是subscribeOn方法,这里是线程池隔离用的,另外一个就是正常情况下的调用,调用的是getUserExecutionObservable方法,这个方法在线程池后面讲。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {

if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {

// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

// 其他略

return getUserExecutionObservable(_cmd);

// 其他略

}

})// 其他略

}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {

@Override

public Boolean call() {

return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;

}

}));

}

// 其他略

}

创建相关类

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {

// 动态配置线程池信息

touchConfig();

// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler

return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);

}

这里创建一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创建ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {

// 动态配置线程池信息

touchConfig();

// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler

return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);

}

public Worker createWorker() {

return new HystrixContextSchedulerWorker(actualScheduler.createWorker());

}

线程池判断的地方在HystrixContextSchedulerWorker的schedule方法,不够就抛RejectedExecutionException异常,异常的捕获上面已经讲了。

public Subscription schedule(Action0 action) {

if (threadPool != null) {

// 判断线程池线程的地方

if (!threadPool.isQueueSpaceAvailable()) {

throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");

}

}

return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));

}

正常调用

如果上面都正常,就会调用getUserExecutionObservable方法。这个方法最后会调用run方法。

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {

//其他略

userObservable = getExecutionObservable();

//其他略

}

@Override

final protected Observable<R> getExecutionObservable() {

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

try {

return Observable.just(run());

} catch (Throwable ex) {

return Observable.error(ex);

}

}

})//其他略

}

在这里,同样通过反射,调用到我们的方法。

protected Object run() throws Exception {

LOGGER.debug("execute command: {}", getCommandKey().name());

return process(new Action() {

@Override

Object execute() {

return getCommandAction().execute(getExecutionType());

}

});

}

总结

上面讲了几个fallback调用的方法,以及正常的流程,流程图如下
【Java】hystrix - Fallback是怎么调用的

java源码分析hystrix

阅读 56发布于 今天 15:18

本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议

avatar

大军

运气好的时候要拼命努力,运气不好的时候要储备能力

370 声望

29 粉丝

0 条评论

得票时间

avatar

大军

运气好的时候要拼命努力,运气不好的时候要储备能力

370 声望

29 粉丝

宣传栏

调用Fallback有以下几种情况:

  • 熔断器开启
  • 信号量拒绝
  • 线程池拒绝
  • 执行方法失败

hystrix - @EnableCircuitBreaker那些事我们知道最终会调用HystrixCommand的execute方法,他这个方法就会调用queue方法。

public R execute() {

try {

return queue().get();

} catch (Exception e) {

throw Exceptions.sneakyThrow(decomposeException(e));

}

}

queue方法如下,这里是拿到一个Future对象。重点还是toObservable方法。

 public Future<R> queue() {

// 其他略

final Future<R> delegate = toObservable().toBlocking().toFuture();

// 其他略

}

这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以后面就会调用applyHystrixSemantics方法。

public Observable<R> toObservable() {

//其他略

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {

return Observable.never();

}

return applyHystrixSemantics(_cmd);

}

};

//其他略

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

//其他略

final boolean requestCacheEnabled = isRequestCachingEnabled();

final String cacheKey = getCacheKey();

/* try from cache first */

if (requestCacheEnabled) {

HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);

if (fromCache != null) {

isResponseFromCache = true;

return handleRequestCacheHitAndEmitValues(fromCache, _cmd);

}

}

Observable<R> hystrixObservable =

Observable.defer(applyHystrixSemantics)

.map(wrapWithAllOnNextHooks);

Observable<R> afterCache;

// put in cache

if (requestCacheEnabled && cacheKey != null) {

// wrap it for caching

HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);

HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);

if (fromCache != null) {

// another thread beat us so we'll use the cached value instead

toCache.unsubscribe();

isResponseFromCache = true;

return handleRequestCacheHitAndEmitValues(fromCache, _cmd);

} else {

// we just created an ObservableCommand so we cast and return it

afterCache = toCache.toObservable();

}

} else {

afterCache = hystrixObservable;

}

return afterCache

.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))

.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once

.doOnCompleted(fireOnCompletedHook);

}

});

}

熔断判断

这里是比较重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback方法,如果没有开启熔断,他还会去判断是否能请求到信号量,请求不到就调用handleSemaphoreRejectionViaFallback方法。如果都正常,就调用executeCommandAndObserve方法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

//其他略

if (circuitBreaker.allowRequest()) {

//其他略

if (executionSemaphore.tryAcquire()) {

try {

/* used to track userThreadExecutionTime */

executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

return executeCommandAndObserve(_cmd)

.doOnError(markExceptionThrown)

.doOnTerminate(singleSemaphoreRelease)

.doOnUnsubscribe(singleSemaphoreRelease);

} catch (RuntimeException e) {

return Observable.error(e);

}

} else {

return handleSemaphoreRejectionViaFallback();

}

} else {

//

return handleShortCircuitViaFallback();

}

}

熔断开启

我们先看看熔断开启后的调用,会通过getFallbackObservable方法获取fallbackExecutionChain,getFallbackObservable主要的作用就是调用getFallback方法。

private Observable<R> handleShortCircuitViaFallback() {

// 其他略

return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,

"short-circuited", shortCircuitException);

// 其他略

}

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {

// 其他略

fallbackExecutionChain = getFallbackObservable();

// 其他略

return fallbackExecutionChain

.doOnEach(setRequestContext)

.lift(new FallbackHookApplication(_cmd))

.lift(new DeprecatedOnFallbackHookApplication(_cmd))

.doOnNext(markFallbackEmit)

.doOnCompleted(markFallbackCompleted)

.onErrorResumeNext(handleFallbackError)

.doOnTerminate(singleSemaphoreRelease)

.doOnUnsubscribe(singleSemaphoreRelease);

}

// 其他略

}

@Override

final protected Observable<R> getFallbackObservable() {

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

try {

return Observable.just(getFallback());

} catch (Throwable ex) {

return Observable.error(ex);

}

}

});

}

getFallback是调用fallback方法的入口,MethodExecutionAction会通过发射,调用我们配置的方法。

@Override

protected Object getFallback() {

final CommandAction commandAction = getFallbackAction();

if (commandAction != null) {

try {

return process(new Action() {

@Override

Object execute() {

MetaHolder metaHolder = commandAction.getMetaHolder();

Object[] args = createArgsForFallback(metaHolder, getExecutionException());

return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);

}

});

} catch (Throwable e) {

LOGGER.error(FallbackErrorMessageBuilder.create()

.append(commandAction, e).build());

throw new FallbackInvocationException(unwrapCause(e));

}

} else {

return super.getFallback();

}

}

信号量隔离

我们看了上面熔断开启的代码,这边就比较简单,就是跟上面一样,调用getFallbackOrThrowException方法。

private Observable<R> handleSemaphoreRejectionViaFallback() {

// 其他略

return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,

"could not acquire a semaphore for execution", semaphoreRejectionException);

}

线程池隔离

如果熔断没开,信号量又可以获取到,他就会调用executeCommandAndObserve方法。这个方法,handleFallback定义了几个异常,比如线程池异常处理,时间超时处理,请求异常处理,以及其他异常处理。然后调用executeCommandWithSpecifiedIsolation方法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {

//其他略

final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {

@Override

public Observable<R> call(Throwable t) {

Exception e = getExceptionFromThrowable(t);

executionResult = executionResult.setExecutionException(e);

if (e instanceof RejectedExecutionException) {

return handleThreadPoolRejectionViaFallback(e);

} else if (t instanceof HystrixTimeoutException) {

return handleTimeoutViaFallback();

} else if (t instanceof HystrixBadRequestException) {

return handleBadRequestByEmittingError(e);

} else {

/*

* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.

*/

if (e instanceof HystrixBadRequestException) {

eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);

return Observable.error(e);

}

return handleFailureViaFallback(e);

}

}

};

//其他略

Observable<R> execution;

if (properties.executionTimeoutEnabled().get()) {

execution = executeCommandWithSpecifiedIsolation(_cmd)

.lift(new HystrixObservableTimeoutOperator<R>(_cmd));

} else {

execution = executeCommandWithSpecifiedIsolation(_cmd);

}

return execution.doOnNext(markEmits)

.doOnCompleted(markOnCompleted)

.onErrorResumeNext(handleFallback)

.doOnEach(setRequestContext);

}

这个方法主要是两件事情,一个是subscribeOn方法,这里是线程池隔离用的,另外一个就是正常情况下的调用,调用的是getUserExecutionObservable方法,这个方法在线程池后面讲。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {

if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {

// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

// 其他略

return getUserExecutionObservable(_cmd);

// 其他略

}

})// 其他略

}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {

@Override

public Boolean call() {

return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;

}

}));

}

// 其他略

}

创建相关类

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {

// 动态配置线程池信息

touchConfig();

// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler

return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);

}

这里创建一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创建ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker

public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {

// 动态配置线程池信息

touchConfig();

// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler

return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);

}

public Worker createWorker() {

return new HystrixContextSchedulerWorker(actualScheduler.createWorker());

}

线程池判断的地方在HystrixContextSchedulerWorker的schedule方法,不够就抛RejectedExecutionException异常,异常的捕获上面已经讲了。

public Subscription schedule(Action0 action) {

if (threadPool != null) {

// 判断线程池线程的地方

if (!threadPool.isQueueSpaceAvailable()) {

throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");

}

}

return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));

}

正常调用

如果上面都正常,就会调用getUserExecutionObservable方法。这个方法最后会调用run方法。

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {

//其他略

userObservable = getExecutionObservable();

//其他略

}

@Override

final protected Observable<R> getExecutionObservable() {

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

try {

return Observable.just(run());

} catch (Throwable ex) {

return Observable.error(ex);

}

}

})//其他略

}

在这里,同样通过反射,调用到我们的方法。

protected Object run() throws Exception {

LOGGER.debug("execute command: {}", getCommandKey().name());

return process(new Action() {

@Override

Object execute() {

return getCommandAction().execute(getExecutionType());

}

});

}

总结

上面讲了几个fallback调用的方法,以及正常的流程,流程图如下
【Java】hystrix - Fallback是怎么调用的

以上是 【Java】hystrix - Fallback是怎么调用的 的全部内容, 来源链接: utcz.com/a/108050.html

回到顶部