陈同学
微服务
Accelerator
About
# Spring Cloud 源码学习之 Hystrix 请求缓存 本文学习了 Hystrix 请求缓存机制。 ## 场景 先用一个小场景演示下请求缓存。 向 **服务A** 查询一页数据,共10条,每条都有一个orgId字段,需要根据orgId向 **服务B** 查询orgName。10条数据中orgId有8条相同,剩余2条相同。 下面写下伪代码: **方式一:循环10次**: ```java for (org : 10条数据) { org.setOrgName(向服务B获取orgName); } ``` 服务间调用,内网调用,走HTTP的话,即使每个请求50-100ms,10个请求也有0.5到1s,耗时非常久。 **方式二:人工缓存** ```java Map<String, String> organizations = new HashMap<>(10); for (org : 10条数据) { if (organizations.containsKey(org.getOrgId)) { // 从缓存中读取 org.setOrgName(organizations.get(org.getOrgId)); } else { // 远程调用B服务 org.setOrgName(向服务B获取orgName); // 加入缓存 organizations.put(org.getOrgId, org.getOrgName); } } ``` 这样只需要调用2次B服务,耗时在100-200毫秒之间,性能提升5倍。但这样做真的好吗? 微服务中,服务之间的依赖非常多,如果每个方法都自行处理缓存的话,应用中可以想象有多少累赘的缓存代码。 **方式三:自动缓存** 这属于本文的主题,在请求生命周期内,无论是当前线程,还是其他线程,只要请求相同的数据,就自动做缓存,不侵入业务代码。 ## ReplaySubject 自动缓存的实现方式有多种,这里介绍 Hystrix 的实现方式。Hystrix 使用了 RxJava 中的 ReplaySubject。 replay 译为重放,Subject 是个合体工具,既可以做数据发射器(被观察者、Observable),也可以做数据消费者(观察者、Observer)。 看个小例子就明白: ```java @Test public void replaySubject() { ReplaySubject<Integer> replaySubject = ReplaySubject.create(); replaySubject.subscribe(v -> System.out.println("订阅者1:" + v)); replaySubject.onNext(1); replaySubject.onNext(2); replaySubject.subscribe(v -> System.out.println("订阅者2:" + v)); replaySubject.onNext(3); replaySubject.subscribe(v -> System.out.println("订阅者3:" + v)); } ``` 输出结果(换行由手工添加): ``` 订阅者1:1 订阅者1:2 订阅者2:1 订阅者2:2 订阅者1:3 订阅者2:3 订阅者3:1 订阅者3:2 订阅者3:3 ``` 可以看出,无论是 replaySubject 多久前发射的数据,新的订阅者都可以收到所有数据。类比一下:一位大V,提供订阅服务,任何人任何时候订阅,大V都会把以前的所有资料发你一份。 **请求缓存用的就是 ReplaySubject 这个特性,如果请求相同数据,就把原先的结果发你一份**。 ## 请求缓存的实现 > 在 [Spring Cloud 源码学习之 Hystrix 工作原理](https://chenyongjun.vip/articles/88) 一文中,有 Hystrix 的全流程源码介绍。 这是AbstractCommand.toObservable()中关于请求缓存的源码。请求缓存有2个条件,一是启用了请求缓存,二是有cacheKey。 ```java public Observable<R> toObservable() { final AbstractCommand<R> _cmd = this; ... return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call() { ... final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); // 启用了requestCache, 则尝试从缓存中获取 if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null) { isResponseFromCache = true; // 从缓存中获取数据 return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; // 启用缓存而且有cacheKey if (requestCacheEnabled && cacheKey != null) { // 使用HystrixCachedObservable来包装Obervable,并且添加到requestCache中 HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); ... afterCache = toCache.toObservable(); } ... } }); } ``` 整个逻辑还是非常简单的,在启用缓存的前提后,有缓存则读缓存,没缓存则缓存结果供下次使用。 再看下HystrixRequestCache,用于缓存的工具。 ``` Cache that is scoped to the current request as managed by HystrixRequestVariableDefault. This is used for short-lived caching of HystrixCommand instances to allow de-duping of command executions within a request. 缓存仅在请求范围内使用,主要用途是减少HystrixCommand实例的执行次数(缓存结果后执行次数自然少了) ``` HystrixRequestCache实例的存储是由自身的静态变量搞定,未提供public的构造器,通过 **getInstance()** 的静态方法来获取与cacheKey对应的实例。 ```java public class HystrixRequestCache { private final static ConcurrentHashMap<RequestCacheKey, HystrixRequestCache> caches = new ConcurrentHashMap<RequestCacheKey, HystrixRequestCache>(); } public static HystrixRequestCache getInstance(HystrixCommandKey key, HystrixConcurrencyStrategy concurrencyStrategy) { return getInstance(new RequestCacheKey(key, concurrencyStrategy), concurrencyStrategy); } ``` 存储HystrixCachedObservable的数据结构是ConcurrentHashMap,cacheKey作为key,HystrixCachedObservable为value。 ```java private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>() { @Override public ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> initialValue() { return new ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>(); } ... }); ``` 再看看缓存的结果HystrixCachedObservable,这个就用到了上面提过的ReplaySubject。将一个普通的Observable包装成了HystrixCachedObservable。 ```java public class HystrixCachedObservable<R> { protected final Subscription originalSubscription; protected final Observable<R> cachedObservable; private volatile int outstandingSubscriptions = 0; protected HystrixCachedObservable(final Observable<R> originalObservable) { ReplaySubject<R> replaySubject = ReplaySubject.create(); // 订阅普通的Observable, 拿到其中的数据 this.originalSubscription = originalObservable .subscribe(replaySubject); this.cachedObservable = replaySubject... } ... // 将cachedObservable作为数据源提供出去, 完成普通Observable向ReplaySubject的转换 public Observable<R> toObservable() { return cachedObservable; } } ``` 因此,command执行一次拿到结果来自于ReplaySubject。后续无论有多少次订阅(即执行command),都把已有的结果推送一次,从而达到缓存请求结果的效果。 ## 如何使用缓存的结果 以HystrixCommand的 queue() 方法为例: ```java public Future<R> queue() { // 调用 toObservable 拿到数据源 final Future<R> delegate = toObservable().toBlocking().toFuture(); ... } ``` 在toFuture()中会订阅这个数据源: ```java public static <T> Future<T> toFuture(Observable<? extends T> that) { final CountDownLatch finished = new CountDownLatch(1); final AtomicReference<T> value = new AtomicReference<T>(); final AtomicReference<Throwable> error = new AtomicReference<Throwable>(); // 首先,通过single()确保从Observable中拿到单个结果. 然后订阅数据源 @SuppressWarnings("unchecked") final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() { @Override public void onNext(T v) { // 拿到执行的结果后放到AtomicReference中 value.set(v); } }); return new Future<T>() { private volatile boolean cancelled; // 返回执行结果 @Override public T get() throws InterruptedException, ExecutionException { finished.await(); return getValue(); } }; } ``` 由于toObservable()拿到的是一个ReplaySubject,下次命令再次执行时,订阅ReplaySubject后,可以直接拿到之前已有的结果。 ## 缓存的生命周期 缓存是request scope,在同一个请求范围内,所有线程都可以使用相同cacheKey已缓存的结果。 为什么是request scope呢?**在数据动态变化的情况下,即使参数相同,多次请求访问时,缓存也没有意义**。所以只在同一个请求下使用。 下面是个小例子: ```java public class HystrixCommandCacheTest extends HystrixCommand<String> { private final String value; public HystrixCommandCacheTest(String value) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.value = value; } // 将 value 参数作为key, 模拟请求的参数 @Override protected String getCacheKey() { return value; } @Override protected String run() throws Exception { return "hello," + value; } public static void main(String[] args) { // 第一个请求环境 HystrixRequestContext context1 = HystrixRequestContext.initializeContext(); HystrixCommandCacheTest cmd1 = new HystrixCommandCacheTest("kitty"); System.out.println("cmd1结果:" + cmd1.execute() + ";使用缓存:" + cmd1.isResponseFromCache()); // 模拟10个相同请求参数的命令执行 for (int i = 0; i < 10; i++) { HystrixCommandCacheTest tempCmd = new HystrixCommandCacheTest("kitty"); System.out.println("第" + i + " 次执行:" + tempCmd.execute() + ";使用缓存:" + tempCmd.isResponseFromCache()); } context1.shutdown(); // 第二个请求环境 HystrixRequestContext context2 = HystrixRequestContext.initializeContext(); HystrixCommandCacheTest cmd2 = new HystrixCommandCacheTest("kitty"); System.out.println("cmd2结果:" + cmd2.execute() + ";使用缓存:" + cmd2.isResponseFromCache()); } } ``` 输出结果如下: ``` cmd1结果:hello,kitty;使用缓存:false 第0 次执行:hello,kitty;使用缓存:true 第1 次执行:hello,kitty;使用缓存:true 第2 次执行:hello,kitty;使用缓存:true 第3 次执行:hello,kitty;使用缓存:true 第4 次执行:hello,kitty;使用缓存:true 第5 次执行:hello,kitty;使用缓存:true 第6 次执行:hello,kitty;使用缓存:true 第7 次执行:hello,kitty;使用缓存:true 第8 次执行:hello,kitty;使用缓存:true 第9 次执行:hello,kitty;使用缓存:true cmd2结果:hello,kitty;使用缓存:false ``` 第一次没有缓存,后面10次执行都用了第一次的执行结果。第二次请求时没有缓冲可用。 ## 小结 利用缓存可以极大的提升性能,"天下武功,唯快不破"。 如何练就一门快功夫呢?方式有多种,举两个小例子: * 速度再快比不上近水楼台,直接用应用缓存肯定比网络通讯获取数据快得多 * 利用各类缓存"神器",比如Redis,人家就是快。 为了提升性能,从用户发起请求的那一刻起,链路上的各类角色就在各显神通了,例如: * 浏览器缓存静态资源;提供LocalStorage这种缓存结构,单页面应用可直接使用 * 请求进入网络后,利用CDN,优先从地理位置较近的地方拉取资源 * 请求到达目表网络后,可以从代理中读取缓存数据(如nginx缓存) * 请求达到应用后,应用直接从内存中获取数据,如:Map、Guava等 * 分布式缓存,例如使用Redis提供缓存,减少对DB的直接访问
本文由
cyj
创作,可自由转载、引用,但需署名作者且注明文章出处。
文章标题:
Spring Cloud 源码学习之 Hystrix 请求缓存
文章链接:
https://chenyongjun.vip/articles/93
扫码或搜索 cyjrun 关注微信公众号, 结伴学习, 一起努力