陈同学
微服务
Accelerator
About
# Spring Cloud 源码学习之 Hystrix 隔离策略 本文学习了 Hystrix 隔离策略的源码。 ## 简介 隔离是一种常见的风险控制(保护)手段,举几个小例子: * **森林防火阻隔带**:在森林失火时阻止火势蔓延 * **传染病隔离病区**:既有利于病人治疗,也有利于阻止感染健康人群 * **自然保护区**:保护珍稀动植物 Hystrix 也使用了隔离策略,称之为 **bulkhead pattern**,翻译为:舱壁隔离模式。舱壁将船体内部空间划分成多个舱室,将舱与舱之间严密分开,在航行中,即使有舱破损进水,水也不会流到其他舱。 Hystrix 的隔离指使用 bulkhead 模式来隔离各个依赖服务之间的调用,同时限制对各个依赖服务的并发访问,使得各依赖服务之间互不影响。 ## 隔离方式 Hystrix 提供了两种隔离方式。 ### Thread Pools(线程池) 将各依赖服务的访问交由独立的线程池来处理,会为每个依赖服务创建一个线程池。 虽然可以起到很好的隔离作用,但也增加了计算开销,每个命令的执行都涉及到queueing、scheduling、context switching ### Semaphores(信号量) 通过为各依赖服务设置信号量(或计数器)来限制并发调用,相当于对各依赖服务做限流。信号量模式下任务由当前线程直接处理,不涉及到线程切换,自然也就没有超时控制。 ## 源码学习 [Spring Cloud 源码学习之 Hystrix 工作原理](https://chenyongjun.vip/articles/88) 一文中介绍了 Hystrix 的原理,隔离策略在executeCommandWithSpecifiedIsolation()方法中进行实现。 ```java // 以设定的隔离策略执行command private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ... // 获取包裹实际Task的Observable return getUserExecutionObservable(_cmd); ... } } }).doOnTerminate(...) .doOnUnsubscribe(...) // 以指定的线程执行 .subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {...})); } else { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ... return getUserExecutionObservable(_cmd); ... } }); } } ``` 信号量模式在当前线程执行,线程池模式核心代码是这句: ```java subscribeOn(threadPool.getScheduler(new Func0<Boolean>()... ``` 使用Scheduler来处理当前任务,且看看threadPool.getScheduler是如何处理的? threadPool是command的普通属性: ``` protected final HystrixThreadPool threadPool; ``` 初始化方式如下: ```java private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { if (fromConstructor == null) { return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); } else { return fromConstructor; } } ``` 每个threadPoolKey会维护一个HystrixThreadPool。 ```java static class Factory { final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>(); static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) { String key = threadPoolKey.name(); // this should find it for all but the first time HystrixThreadPool previouslyCached = threadPools.get(key); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize synchronized (HystrixThreadPool.class) { if (!threadPools.containsKey(key)) { threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder)); } } return threadPools.get(key); } ``` 在 new HystrixThreadPoolDefault() 时,最终使用的是ThreadPoolExecutor。 在Hystrix的工作机制中,在判断熔断之后,准备执行任务前,会先判断信号量拒绝和线程池拒绝的情况,如下: ```java private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { if (circuitBreaker.attemptExecution()) { // 获取信号量 final TryableSemaphore executionSemaphore = getExecutionSemaphore(); ... // 尝试获取执行信号,能否执行当前命令 if (executionSemaphore.tryAcquire()) { ... } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } } ``` 只有在隔离策略为SEMAPHORE时,才会创建TryableSemaphoreActual,否则返回一个什么也不做的TryableSemaphoreNoOp(tryAcquire()将永远返回true)。 ```java protected TryableSemaphore getExecutionSemaphore() { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) { ... } else { // return NoOp implementation since we're not using SEMAPHORE isolation return TryableSemaphoreNoOp.DEFAULT; } } ``` 再看看TryableSemaphoreActual ```java static class TryableSemaphoreActual implements TryableSemaphore { protected final HystrixProperty<Integer> numberOfPermits; private final AtomicInteger count = new AtomicInteger(0); public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) { // 每个HystrixCommandKey默认信号量数量,默认10 this.numberOfPermits = numberOfPermits; } @Override public boolean tryAcquire() { int currentCount = count.incrementAndGet(); // 如果信号量超过设定的信号量,则启动信号量拒绝 if (currentCount > numberOfPermits.get()) { count.decrementAndGet(); return false; } else { return true; } } ... } ```
本文由
cyj
创作,可自由转载、引用,但需署名作者且注明文章出处。
文章标题:
Spring Cloud 源码学习之 Hystrix 隔离策略
文章链接:
https://chenyongjun.vip/articles/95
扫码或搜索 cyjrun 关注微信公众号, 结伴学习, 一起努力