【微服务】Hystrix请求执行流程分析
hystrix 请求执行流程源码分析
HystrixCommandAspect:所有注解为HystrixCommand或HystrixCollapser的Http请求方法被拦截,整个请求封装成Command在Hystrix流转,分为Observable和非Observable类型,
底层都是采用RxJava1.x实现异步请求处理,请求执行类型分为:SYNCHRONOUS、ASYNCHRONOUS、OBSERVABLE, OBSERVABLE分为冷(toObservable)和热(observe)两种类型,冷即RxJava Observable被订阅的时候才会发送数据,re是不等订阅就发送数据,
通过HystrixCommand的observableExecutionMode属性决定,默认eagle,即热Observable
举例:正常请求拦截(非Observable同步执行类型)
1. 实际业务处理:com.netflix.hystrix.AbstractCommand.applyHystrixSemantics
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 标识准备开始执行Command,可以直接实现HystrixCommandExecutionHook监听command执行生命周期活动
executionHook.onStart(_cmd);
/* 断路器决定是否放开请求:根据断路器的3种中断来判断 */
if (circuitBreaker.allowRequest()) {
// 获取信号量组件:默认线程池返回TryableSemaphoreNoOp.DEFAULT,若设置成信号量机制则返回TryableSemaphoreActual,并发数又配置:execution.isolation.semaphore.maxConcurrentRequests 决定
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
//创建信号量是否释放标识:原子性
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// 创建RxJava函数,主要用于根据semaphoreHasBeenReleased的信号量释放标识根据CAS操作释放信号量
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
// 创建RxJava函数: 主要用于通知异常事件, 默认HystrixEventNotifierDefault,do nothing
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// 尝试获取信号量,类似AQS共享锁实现,state,标识共享的资源个数,即Hystrix信号量机制最大的请求并发数
// 信号量机制:TryableSemaphoreActual
// 线程池机制:TryableSemaphoreNoOp
if (executionSemaphore.tryAcquire()) {
try {
/* 记录command处理开始时间 */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 真正执行具体的command业务逻辑,并利用RxJava监听事件:Error 错误监听,Terminate:命令执行完成监听(成功/失败)、unsubscribe 取消订阅事件释放信号量
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 获取信号量失败,则触发熔断
return handleSemaphoreRejectionViaFallback();
}
} else {
// 断路器打开,所有请求都被拒绝,触发熔断操作
return handleShortCircuitViaFallback();
}
}
2. 断路器状态(打开、关闭、半开)
断路器状态:
- 打开: 断路器打开,决绝所有请求
- 关闭: 断路器关闭,接收所有请求
- 半开: 只放开1个请求进入,测试目标服务是否可访问,若可行则断路器关闭,接收所有请求
源码:com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl.allowRequest
@Overridepublic boolean allowRequest() {
// 断路器强制打开: 拒绝所有请求
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
// 断路器关闭:返回true,允许放入请求
if (properties.circuitBreakerForceClosed().get()) {
//实际检测断路器状态,若单个时间窗口最大请求数 > circuitBreaker.requestVolumeThreshold 或 错误请求百分比 > circuitBreaker.errorThresholdPercentage
// circuitOpen通过CAS操作标识断路器打开,circuitOpenedOrLastTestedTime 更新最新断路器打开时间,方便下个时间窗口尝试放入请求
isOpen();
return true;
}
// 半开状态:断路器打开,允许放入1个请求去测试
return !isOpen() || allowSingleTest();
}
尝试放入单个请求进入处理
public boolean allowSingleTest() { // 获取最新断路器打开时间
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 断路器打开、而且过了1个时间窗口,则放入1个请求进入,返回true,标识整个Http请求流程可继续执行
// 时间窗口时间:circuitBreaker.sleepWindowInMilliseconds
// 1) if the circuit is open
// 2) and it"s been longer than "sleepWindow" since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// CAS更新断路器打开时间未保证只有1个请求通过断路器(原子性)
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
3. Hystrix 请求处理逻辑控制
com.netflix.hystrix.AbstractCommand.executeCommandAndObserve- markEmits: RxJava 发射数据监听,主要用于数据统计
- markOnCompleted: RxJava发射数据完成监听,即Hystrix请求处理完成监听
- handleFallback: 执行过程出现异常,进行服务降级处理(RejectedExecutionException、HystrixTimeoutException、HystrixBadRequestException、其他通用异常处理)
- setRequestContext: 设置请求上下文,类似:Zuul RequestContext,Http请求环境
com.netflix.hystrix.AbstractCommand.executeCommandWithSpecifiedIsolation: 按照隔离级别不同进行不同业务处理
com.netflix.hystrix.AbstractCommand.executeCommandWithSpecifiedIsolation
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { // 线程池隔离级别
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
// command状态周期:NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL
// 更新command状态:OBSERVABLE_CHAIN_CREATED --> USER_CODE_EXECUTED, 标识要准备执行代码,若状态不对则RxJava抛出错误,Hystrix直接服务熔断处理
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// Hystrix 指标器设置监听的command的相关数据: commandKey(默认方法名)
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
// 超时检测
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
return Observable.error(new RuntimeException("timed out before executing run()"));
}
// 线程状态更新:NOT_USING_THREAD --> STARTED, 标识业务线程准备运行
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//HystrixCountera统计并发线程数 + 1
HystrixCounters.incrementGlobalConcurrentThreads();
// 线程池标记请求:计数器+1,标识线程池运行线程数
threadPool.markThreadExecution();
// 保存正在运行的command,本质存在stack中,后续执行完成后从stack pop
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
// 创建执行结果,标识线程状态正在执行
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
// 事件监听钩子函数
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
// 获取用户执行任务的Observable
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() { //任务完成终止操作
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don"t think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() { // 取消订阅操作
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { // 订阅操作
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
// 信号量隔离级别
} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn"t throw
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
4. 用户代码执行具体业务
com.netflix.hystrix.AbstractCommand.getUserExecutionObservable
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) { Observable<R> userObservable;
try {
// 获取用户userObservable, single类型,返回单值,本质由用户代码执行
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
com.netflix.hystrix.HystrixCommand.getExecutionObservable: 重写getExecutionObservable()
final protected Observable<R> getExecutionObservable() { return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 返回单值Observable: toObservable().toBlocking().toFuture(); 触发emit操作(com.netflix.hystrix.HystrixCommand.queue)
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
run实现:com.netflix.hystrix.contrib.javanica.command.GenericCommand.run
@Overrideprotected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
//MethodExecutionAction:普通方法执行,非懒加载机制
return getCommandAction().execute(getExecutionType());
}
});
}
com.netflix.hystrix.contrib.javanica.command.MethodExecutionAction.executeWithArgs
@Overridepublic Object executeWithArgs(ExecutionType executionType, Object[] args) throws CommandActionExecutionException {
if(ExecutionType.ASYNCHRONOUS == executionType){
Closure closure = AsyncClosureFactory.getInstance().createClosure(metaHolder, method, object, args);
return executeClj(closure.getClosureObj(), closure.getClosureMethod());
}
return execute(object, method, args);
}
// 反射机制回调,直接执行断路器切入点的具体业务逻辑,涉及服务底层由Ribbon负责与目标服务交互
private Object execute(Object o, Method m, Object... args) throws CommandActionExecutionException {
Object result = null;
try {
m.setAccessible(true); // suppress Java language access
if (isCompileWeaving() && metaHolder.getAjcMethod() != null) {
result = invokeAjcMethod(metaHolder.getAjcMethod(), o, metaHolder, args);
} else {
result = m.invoke(o, args);
}
} catch (IllegalAccessException e) {
propagateCause(e);
} catch (InvocationTargetException e) {
propagateCause(e);
}
return result;
}
以上是 【微服务】Hystrix请求执行流程分析 的全部内容, 来源链接: utcz.com/z/519027.html