【微服务】Hystrix请求执行流程分析

编程

hystrix 请求执行流程源码分析

HystrixCommandAspect:所有注解为HystrixCommand或HystrixCollapser的Http请求方法被拦截,整个请求封装成Command在Hystrix流转,分为Observable和非Observable类型,

底层都是采用RxJava1.x实现异步请求处理,请求执行类型分为:SYNCHRONOUS、ASYNCHRONOUS、OBSERVABLE, OBSERVABLE分为冷(toObservable)和热(observe)两种类型,冷即RxJava Observable被订阅的时候才会发送数据,re是不等订阅就发送数据,

通过HystrixCommand的observableExecutionMode属性决定,默认eagle,即热Observable


举例:正常请求拦截(非Observable同步执行类型)

1. 实际业务处理:com.netflix.hystrix.AbstractCommand.applyHystrixSemantics

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {

// 标识准备开始执行Command,可以直接实现HystrixCommandExecutionHook监听command执行生命周期活动

executionHook.onStart(_cmd);

/* 断路器决定是否放开请求:根据断路器的3种中断来判断 */

if (circuitBreaker.allowRequest()) {

// 获取信号量组件:默认线程池返回TryableSemaphoreNoOp.DEFAULT,若设置成信号量机制则返回TryableSemaphoreActual,并发数又配置:execution.isolation.semaphore.maxConcurrentRequests 决定

final TryableSemaphore executionSemaphore = getExecutionSemaphore();

//创建信号量是否释放标识:原子性

final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);

// 创建RxJava函数,主要用于根据semaphoreHasBeenReleased的信号量释放标识根据CAS操作释放信号量

final Action0 singleSemaphoreRelease = new Action0() {

@Override

public void call() {

if (semaphoreHasBeenReleased.compareAndSet(false, true)) {

executionSemaphore.release();

}

}

};

// 创建RxJava函数: 主要用于通知异常事件, 默认HystrixEventNotifierDefault,do nothing

final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {

@Override

public void call(Throwable t) {

eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);

}

};

// 尝试获取信号量,类似AQS共享锁实现,state,标识共享的资源个数,即Hystrix信号量机制最大的请求并发数

// 信号量机制:TryableSemaphoreActual

// 线程池机制:TryableSemaphoreNoOp

if (executionSemaphore.tryAcquire()) {

try {

/* 记录command处理开始时间 */

executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());

// 真正执行具体的command业务逻辑,并利用RxJava监听事件:Error 错误监听,Terminate:命令执行完成监听(成功/失败)、unsubscribe 取消订阅事件释放信号量

return executeCommandAndObserve(_cmd)

.doOnError(markExceptionThrown)

.doOnTerminate(singleSemaphoreRelease)

.doOnUnsubscribe(singleSemaphoreRelease);

} catch (RuntimeException e) {

return Observable.error(e);

}

} else {

// 获取信号量失败,则触发熔断

return handleSemaphoreRejectionViaFallback();

}

} else {

// 断路器打开,所有请求都被拒绝,触发熔断操作

return handleShortCircuitViaFallback();

}

}

2. 断路器状态(打开、关闭、半开)

断路器状态:

  1. 打开: 断路器打开,决绝所有请求
  2. 关闭: 断路器关闭,接收所有请求
  3. 半开: 只放开1个请求进入,测试目标服务是否可访问,若可行则断路器关闭,接收所有请求

源码:com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl.allowRequest

@Override

public boolean allowRequest() {

// 断路器强制打开: 拒绝所有请求

if (properties.circuitBreakerForceOpen().get()) {

// properties have asked us to force the circuit open so we will allow NO requests

return false;

}

// 断路器关闭:返回true,允许放入请求

if (properties.circuitBreakerForceClosed().get()) {

//实际检测断路器状态,若单个时间窗口最大请求数 > circuitBreaker.requestVolumeThreshold 或 错误请求百分比 > circuitBreaker.errorThresholdPercentage

// circuitOpen通过CAS操作标识断路器打开,circuitOpenedOrLastTestedTime 更新最新断路器打开时间,方便下个时间窗口尝试放入请求

isOpen();

return true;

}

// 半开状态:断路器打开,允许放入1个请求去测试

return !isOpen() || allowSingleTest();

}

尝试放入单个请求进入处理

 public boolean allowSingleTest() {

// 获取最新断路器打开时间

long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();

// 断路器打开、而且过了1个时间窗口,则放入1个请求进入,返回true,标识整个Http请求流程可继续执行

// 时间窗口时间:circuitBreaker.sleepWindowInMilliseconds

// 1) if the circuit is open

// 2) and it"s been longer than "sleepWindow" since we opened the circuit

if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {

// CAS更新断路器打开时间未保证只有1个请求通过断路器(原子性)

if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {

return true;

}

}

return false;

}

3. Hystrix 请求处理逻辑控制

com.netflix.hystrix.AbstractCommand.executeCommandAndObserve

- markEmits: RxJava 发射数据监听,主要用于数据统计

- markOnCompleted: RxJava发射数据完成监听,即Hystrix请求处理完成监听

- handleFallback: 执行过程出现异常,进行服务降级处理(RejectedExecutionException、HystrixTimeoutException、HystrixBadRequestException、其他通用异常处理)

- setRequestContext: 设置请求上下文,类似:Zuul RequestContext,Http请求环境

com.netflix.hystrix.AbstractCommand.executeCommandWithSpecifiedIsolation: 按照隔离级别不同进行不同业务处理

com.netflix.hystrix.AbstractCommand.executeCommandWithSpecifiedIsolation

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {

// 线程池隔离级别

if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {

// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

executionResult = executionResult.setExecutionOccurred();

// command状态周期:NOT_STARTED, OBSERVABLE_CHAIN_CREATED, USER_CODE_EXECUTED, UNSUBSCRIBED, TERMINAL

// 更新command状态:OBSERVABLE_CHAIN_CREATED --> USER_CODE_EXECUTED, 标识要准备执行代码,若状态不对则RxJava抛出错误,Hystrix直接服务熔断处理

if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {

return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

}

// Hystrix 指标器设置监听的command的相关数据: commandKey(默认方法名)

metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

// 超时检测

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {

// the command timed out in the wrapping thread so we will return immediately

// and not increment any of the counters below or other such logic

return Observable.error(new RuntimeException("timed out before executing run()"));

}

// 线程状态更新:NOT_USING_THREAD --> STARTED, 标识业务线程准备运行

if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {

//HystrixCountera统计并发线程数 + 1

HystrixCounters.incrementGlobalConcurrentThreads();

// 线程池标记请求:计数器+1,标识线程池运行线程数

threadPool.markThreadExecution();

// 保存正在运行的command,本质存在stack中,后续执行完成后从stack pop

endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());

// 创建执行结果,标识线程状态正在执行

executionResult = executionResult.setExecutedInThread();

/**

* If any of these hooks throw an exception, then it appears as if the actual execution threw an error

*/

try {

// 事件监听钩子函数

executionHook.onThreadStart(_cmd);

executionHook.onRunStart(_cmd);

executionHook.onExecutionStart(_cmd);

// 获取用户执行任务的Observable

return getUserExecutionObservable(_cmd);

} catch (Throwable ex) {

return Observable.error(ex);

}

} else {

//command has already been unsubscribed, so return immediately

return Observable.error(new RuntimeException("unsubscribed before executing run()"));

}

}

}).doOnTerminate(new Action0() { //任务完成终止操作

@Override

public void call() {

if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {

handleThreadEnd(_cmd);

}

if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {

//if it was never started and received terminal, then no need to clean up (I don"t think this is possible)

}

//if it was unsubscribed, then other cleanup handled it

}

}).doOnUnsubscribe(new Action0() { // 取消订阅操作

@Override

public void call() {

if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {

handleThreadEnd(_cmd);

}

if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {

//if it was never started and was cancelled, then no need to clean up

}

//if it was terminal, then other cleanup handled it

}

}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { // 订阅操作

@Override

public Boolean call() {

return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;

}

}));

// 信号量隔离级别

} else {

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

executionResult = executionResult.setExecutionOccurred();

if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {

return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));

}

metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);

// semaphore isolated

// store the command that is being run

endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());

try {

executionHook.onRunStart(_cmd);

executionHook.onExecutionStart(_cmd);

return getUserExecutionObservable(_cmd); //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn"t throw

} catch (Throwable ex) {

//If the above hooks throw, then use that as the result of the run method

return Observable.error(ex);

}

}

});

}

}

4. 用户代码执行具体业务

com.netflix.hystrix.AbstractCommand.getUserExecutionObservable

 private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {

Observable<R> userObservable;

try {

// 获取用户userObservable, single类型,返回单值,本质由用户代码执行

userObservable = getExecutionObservable();

} catch (Throwable ex) {

// the run() method is a user provided implementation so can throw instead of using Observable.onError

// so we catch it here and turn it into Observable.error

userObservable = Observable.error(ex);

}

return userObservable

.lift(new ExecutionHookApplication(_cmd))

.lift(new DeprecatedOnRunHookApplication(_cmd));

}

com.netflix.hystrix.HystrixCommand.getExecutionObservable: 重写getExecutionObservable()

final protected Observable<R> getExecutionObservable() {

return Observable.defer(new Func0<Observable<R>>() {

@Override

public Observable<R> call() {

try {

// 返回单值Observable: toObservable().toBlocking().toFuture(); 触发emit操作(com.netflix.hystrix.HystrixCommand.queue)

return Observable.just(run());

} catch (Throwable ex) {

return Observable.error(ex);

}

}

}).doOnSubscribe(new Action0() {

@Override

public void call() {

// Save thread on which we get subscribed so that we can interrupt it later if needed

executionThread.set(Thread.currentThread());

}

});

}

run实现:com.netflix.hystrix.contrib.javanica.command.GenericCommand.run

@Override

protected Object run() throws Exception {

LOGGER.debug("execute command: {}", getCommandKey().name());

return process(new Action() {

@Override

Object execute() {

//MethodExecutionAction:普通方法执行,非懒加载机制

return getCommandAction().execute(getExecutionType());

}

});

}

com.netflix.hystrix.contrib.javanica.command.MethodExecutionAction.executeWithArgs

 @Override

public Object executeWithArgs(ExecutionType executionType, Object[] args) throws CommandActionExecutionException {

if(ExecutionType.ASYNCHRONOUS == executionType){

Closure closure = AsyncClosureFactory.getInstance().createClosure(metaHolder, method, object, args);

return executeClj(closure.getClosureObj(), closure.getClosureMethod());

}

return execute(object, method, args);

}

// 反射机制回调,直接执行断路器切入点的具体业务逻辑,涉及服务底层由Ribbon负责与目标服务交互

private Object execute(Object o, Method m, Object... args) throws CommandActionExecutionException {

Object result = null;

try {

m.setAccessible(true); // suppress Java language access

if (isCompileWeaving() && metaHolder.getAjcMethod() != null) {

result = invokeAjcMethod(metaHolder.getAjcMethod(), o, metaHolder, args);

} else {

result = m.invoke(o, args);

}

} catch (IllegalAccessException e) {

propagateCause(e);

} catch (InvocationTargetException e) {

propagateCause(e);

}

return result;

}

以上是 【微服务】Hystrix请求执行流程分析 的全部内容, 来源链接: utcz.com/z/519027.html

回到顶部