【Java】hystrix - Fallback是怎么调用的
hystrix - Fallback是怎么调用的
大军发布于 今天 15:18
调用Fallback有以下几种情况:
- 熔断器开启
- 信号量拒绝
- 线程池拒绝
- 执行方法失败
hystrix - @EnableCircuitBreaker那些事我们知道最终会调用HystrixCommand的execute方法,他这个方法就会调用queue方法。
public R execute() {try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
queue方法如下,这里是拿到一个Future对象。重点还是toObservable方法。
public Future<R> queue() {// 其他略
final Future<R> delegate = toObservable().toBlocking().toFuture();
// 其他略
}
这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以后面就会调用applyHystrixSemantics方法。
public Observable<R> toObservable() {//其他略
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
//其他略
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//其他略
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
熔断判断
这里是比较重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback方法,如果没有开启熔断,他还会去判断是否能请求到信号量,请求不到就调用handleSemaphoreRejectionViaFallback方法。如果都正常,就调用executeCommandAndObserve方法。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {//其他略
if (circuitBreaker.allowRequest()) {
//其他略
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
//
return handleShortCircuitViaFallback();
}
}
熔断开启
我们先看看熔断开启后的调用,会通过getFallbackObservable方法获取fallbackExecutionChain,getFallbackObservable主要的作用就是调用getFallback方法。
private Observable<R> handleShortCircuitViaFallback() {// 其他略
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
// 其他略
}
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// 其他略
fallbackExecutionChain = getFallbackObservable();
// 其他略
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
}
// 其他略
}
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
getFallback是调用fallback方法的入口,MethodExecutionAction会通过发射,调用我们配置的方法。
@Overrideprotected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}
信号量隔离
我们看了上面熔断开启的代码,这边就比较简单,就是跟上面一样,调用getFallbackOrThrowException方法。
private Observable<R> handleSemaphoreRejectionViaFallback() {// 其他略
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException);
}
线程池隔离
如果熔断没开,信号量又可以获取到,他就会调用executeCommandAndObserve方法。这个方法,handleFallback定义了几个异常,比如线程池异常处理,时间超时处理,请求异常处理,以及其他异常处理。然后调用executeCommandWithSpecifiedIsolation方法。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {//其他略
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
//其他略
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
这个方法主要是两件事情,一个是subscribeOn方法,这里是线程池隔离用的,另外一个就是正常情况下的调用,调用的是getUserExecutionObservable方法,这个方法在线程池后面讲。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 其他略
return getUserExecutionObservable(_cmd);
// 其他略
}
})// 其他略
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 其他略
}
创建相关类
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {// 动态配置线程池信息
touchConfig();
// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
这里创建一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创建ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {// 动态配置线程池信息
touchConfig();
// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
线程池判断的地方在HystrixContextSchedulerWorker的schedule方法,不够就抛RejectedExecutionException异常,异常的捕获上面已经讲了。
public Subscription schedule(Action0 action) {if (threadPool != null) {
// 判断线程池线程的地方
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
正常调用
如果上面都正常,就会调用getUserExecutionObservable方法。这个方法最后会调用run方法。
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {//其他略
userObservable = getExecutionObservable();
//其他略
}
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
})//其他略
}
在这里,同样通过反射,调用到我们的方法。
protected Object run() throws Exception {LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
总结
上面讲了几个fallback调用的方法,以及正常的流程,流程图如下
java源码分析hystrix
阅读 56发布于 今天 15:18
本作品系原创,采用《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
大军
运气好的时候要拼命努力,运气不好的时候要储备能力
370 声望
29 粉丝
大军
运气好的时候要拼命努力,运气不好的时候要储备能力
370 声望
29 粉丝
宣传栏
调用Fallback有以下几种情况:
- 熔断器开启
- 信号量拒绝
- 线程池拒绝
- 执行方法失败
hystrix - @EnableCircuitBreaker那些事我们知道最终会调用HystrixCommand的execute方法,他这个方法就会调用queue方法。
public R execute() {try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
queue方法如下,这里是拿到一个Future对象。重点还是toObservable方法。
public Future<R> queue() {// 其他略
final Future<R> delegate = toObservable().toBlocking().toFuture();
// 其他略
}
这里会定义Observable,他会判断是否要从缓存取值,如果没有,afterCache就取applyHystrixSemantics,所以后面就会调用applyHystrixSemantics方法。
public Observable<R> toObservable() {//其他略
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
//其他略
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//其他略
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
/* try from cache first */
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// wrap it for caching
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
熔断判断
这里是比较重要的代码,首先判断是否开启熔断,如果开启了,就调用handleShortCircuitViaFallback方法,如果没有开启熔断,他还会去判断是否能请求到信号量,请求不到就调用handleSemaphoreRejectionViaFallback方法。如果都正常,就调用executeCommandAndObserve方法。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {//其他略
if (circuitBreaker.allowRequest()) {
//其他略
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
//
return handleShortCircuitViaFallback();
}
}
熔断开启
我们先看看熔断开启后的调用,会通过getFallbackObservable方法获取fallbackExecutionChain,getFallbackObservable主要的作用就是调用getFallback方法。
private Observable<R> handleShortCircuitViaFallback() {// 其他略
return getFallbackOrThrowException(this, HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT,
"short-circuited", shortCircuitException);
// 其他略
}
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// 其他略
fallbackExecutionChain = getFallbackObservable();
// 其他略
return fallbackExecutionChain
.doOnEach(setRequestContext)
.lift(new FallbackHookApplication(_cmd))
.lift(new DeprecatedOnFallbackHookApplication(_cmd))
.doOnNext(markFallbackEmit)
.doOnCompleted(markFallbackCompleted)
.onErrorResumeNext(handleFallbackError)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
}
// 其他略
}
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
getFallback是调用fallback方法的入口,MethodExecutionAction会通过发射,调用我们配置的方法。
@Overrideprotected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}
信号量隔离
我们看了上面熔断开启的代码,这边就比较简单,就是跟上面一样,调用getFallbackOrThrowException方法。
private Observable<R> handleSemaphoreRejectionViaFallback() {// 其他略
return getFallbackOrThrowException(this, HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION,
"could not acquire a semaphore for execution", semaphoreRejectionException);
}
线程池隔离
如果熔断没开,信号量又可以获取到,他就会调用executeCommandAndObserve方法。这个方法,handleFallback定义了几个异常,比如线程池异常处理,时间超时处理,请求异常处理,以及其他异常处理。然后调用executeCommandWithSpecifiedIsolation方法。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {//其他略
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
//其他略
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
这个方法主要是两件事情,一个是subscribeOn方法,这里是线程池隔离用的,另外一个就是正常情况下的调用,调用的是getUserExecutionObservable方法,这个方法在线程池后面讲。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
// 其他略
return getUserExecutionObservable(_cmd);
// 其他略
}
})// 其他略
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
}
// 其他略
}
创建相关类
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {// 动态配置线程池信息
touchConfig();
// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
这里创建一个HystrixContextScheduler对象,ThreadPoolScheduler对象用于创建ThreadPoolWorker,并赋值给HystrixContextSchedulerWorker
public Scheduler getScheduler(Func0<Boolean> shouldInterruptThread) {// 动态配置线程池信息
touchConfig();
// 创建一个HystrixContextScheduler对象,这里又会创建一个ThreadPoolScheduler
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}
线程池判断的地方在HystrixContextSchedulerWorker的schedule方法,不够就抛RejectedExecutionException异常,异常的捕获上面已经讲了。
public Subscription schedule(Action0 action) {if (threadPool != null) {
// 判断线程池线程的地方
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}
正常调用
如果上面都正常,就会调用getUserExecutionObservable方法。这个方法最后会调用run方法。
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {//其他略
userObservable = getExecutionObservable();
//其他略
}
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
})//其他略
}
在这里,同样通过反射,调用到我们的方法。
protected Object run() throws Exception {LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
总结
上面讲了几个fallback调用的方法,以及正常的流程,流程图如下
以上是 【Java】hystrix - Fallback是怎么调用的 的全部内容, 来源链接: utcz.com/a/108050.html
得票时间