cyj
微服务
Accelerator
About
# Spring Cloud 源码学习之 Hystrix Metrics 收集 > 文中源码基于 Spring Cloud **Finchley.SR1**、Spring Boot **2.0.6.RELEASE**. > > Hystrix 其他文章:[Spring Cloud 源码学习之 Hystrix 入门](https://chenyongjun.vip/articles/75)、[Spring Cloud 源码学习之 Hystrix 工作原理](https://chenyongjun.vip/articles/88)、[Spring Cloud 之 Hystrix 跨线程传递数据](https://chenyongjun.vip/articles/83) 在 Hystrix Command 执行过程(开始执行、结束执行、异常、超时)时会不断发出各类事件,通过收集这些数据,提供给消费者。如断路器、Hystrix Dashboard可以统计分析这些数据,从而完成特定的功能。 本文学习了 Metrics 收集的源码,并整理成下图。由于 Hystrix 发出的事件种类很多,本文仅以命令结束执行事件作为学习实例。  ## Subject简述 Hystrix 基于 RxJava,本文涉及到 Subject 概念,这里提一下 **rx.subjects.Subject**。 ```java public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {} ``` Subject 继承Observable,因此可作为被观察者、数据源,也就是一个数据发射器; 实现了接口 Observer,因此可作为观察者,可以订阅其他Observable,处理Observable发射出的数据。 **因此,Subject既可以发射数据,也可以接收数据。类比于菜鸟驿站,可以收、发快递**。 ## Metrics 收集流程 整个过程分成以下三步: ### 1.使用HystrixCommandMetrics记录metrics 每个Command的构造器中会获取一个HystrixCommandMetrics工具,用来记录metrics。 ```java // 构造器利用HystrixCommandMetrics获取命令key对应的对象 HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties); // HystrixCommandMetrics 中存储HystrixCommandMetrics的数据结构 private static final ConcurrentHashMap<String, HystrixCommandMetrics> metrics; ``` 也就是说,每个CommandKey会拥有一个对应的HystrixCommandMetrics工具。 例如:A服务利用Feign远程调用B服务,那下面的 **service-B** 会自动作为命令的key。 ```java @FeignClient(name = "service-B") ``` 下面是利用HystrixCommandMetrics工具发射 **标记命令结束** 的事件代码: ```java void markCommandDone(...) { HystrixThreadEventStream.getInstance().executionDone(...); } ``` ### 2.Per-Thread 事件处理者 HystrixCommandMetrics提供了基础工具方法给Command使用,而HystrixCommandMetrics的实际使用的是**HystrixThreadEventStream: Per-thread event stream**。 它是线程级别的数据处理者,每个线程拥有自己的HystrixThreadEventStream,**HystrixThreadEventStream.getInstance()** 是从ThreadLocal中获取对象。 它包含了很多Subject<事件,事件>,用来接收和发射数据。下面是HystrixThreadEventStream类: ```java public class HystrixThreadEventStream { // Per-thread 的HystrixThreadEventStream private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams; // 用来接收和发射HystrixCommandCompletion事件的Subject private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlyCommandCompletionSubject; } ``` HystrixCommandCompletion是事件(HystrixCommandEvent)的一种,writeOnlyCommandCompletionSubject这个Subject的初始化方式如下: ```java // 创建为一个数据发射器 writeOnlyCommandCompletionSubject = PublishSubject.create(); writeOnlyCommandCompletionSubject .onBackpressureBuffer() // 绑定发射数据时的处理者 .doOnNext(writeCommandCompletionsToShardedStreams) .unsafeSubscribe(Subscribers.empty()); ``` writeCommandCompletionsToShardedStreams会怎么处理数据呢?下面是它的定义: ```java // 它是一个可执行的实体,没有返回值,可以传入一个参数; 和 Runnable很像 private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() { // 当接收到数据时, 又将数据发送给了command级别的处理者 @Override public void call(HystrixCommandCompletion commandCompletion) { // 获取CommandKey对应的HystrixCommandCompletionStream HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey()); // 写入数据 commandStream.write(commandCompletion); ... } }; ``` 现在再回过来看HystrixThreadEventStream这个**Per-thread**的工具发射 **标记命令结束事件** 的代码: ```java public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) { // 构建命令结束的数据对象 HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey); // 利用上面的Subject发射数据, onNext()就是发射一条数据。 writeOnlyCommandCompletionSubject.onNext(event); } ``` 由于writeOnlyCommandCompletionSubject绑定了数据处理者(上面的writeCommandCompletionsToShardedStreams这个Action1)。它会利用command级别的工具来发射数据。 ### 3.Per-Command 事件处理者 通过上一步知道,每个线程有自己的工具(HystrixThreadEventStream)来处理数据,最终这个工具利用了命令级别的工具。上面的**HystrixCommandCompletionStream** 属于 **HystrixEventStream** 的一种,HystrixEventStream专门用于处理command级别的数据,它有如下几个子类: ```java HystrixCommandCompletionStream HystrixCommandStartStream HystrixThreadPoolCompletionStream HystrixThreadPoolStartStream HystrixCollapserEventStream ``` 这几个子类都是用来处理特定类型事件的工具,以HystrixCommandCompletionStream为例子,这些子类的结构都很类似,可以接收数据,并将数据提供给其他消费者。 ```java public class HystrixCommandCompletionStream { // 一个用于接收和发射结束事件的Subject private final Subject<HystrixCommandCompletion, HystrixCommandCompletion> writeOnlySubject; // 一个Observable,将接收到的数据作为数据源发射给其他消费者 private final Observable<HystrixCommandCompletion> readOnlyStream; } ``` 先看看这个**Per-Command** 的对象是怎么创建的? ```java // 存储结构 private static final ConcurrentMap<String, HystrixCommandCompletionStream> streams = new ConcurrentHashMap<String, HystrixCommandCompletionStream>(); // 单例模式拿到HystrixCommandCompletionStream,以命令的key为索引存储在ConcurrentMap中 public static HystrixCommandCompletionStream getInstance(HystrixCommandKey commandKey) { HystrixCommandCompletionStream initialStream = streams.get(commandKey.name()); if (initialStream != null) { return initialStream; } else { synchronized (HystrixCommandCompletionStream.class) { ... } } } ``` 下面是它的构造函数: ```java HystrixCommandCompletionStream(final HystrixCommandKey commandKey) { this.commandKey = commandKey; // 创建可以发射数据的Subject this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create()); // readOnlyStream是一个Observable, share()方法可以将上面Subject发射的数据全部广播给readOnlyStream,相当于拷贝了一份一模一样的数据 this.readOnlyStream = writeOnlySubject.share(); } ``` 这个类提供了很重要的两个方法: ```java // 提供了接收数据的方法,其他工具(如HystrixThreadEventStream)可以将数据写进来 public void write(HystrixCommandCompletion event) { writeOnlySubject.onNext(event); } // 实现HystrixEventStream的observe(方法), 其他消费者可以利用observe()拿到这个数据源,然后订阅它,处理它发射的所有数据 @Override public Observable<HystrixCommandCompletion> observe() { return readOnlyStream; } ``` ### 小结 通过上面三步,数据流向就很清楚了: * Command直接使用HystrixCommandMetrics来记录命令开始、结束等事件 * HystrixCommandMetrics利用线程级别的HystrixThreadEventStream的来接收数据 * HystrixThreadEventStream完成各种事件的封装(如将结束事件封装成HystrixCommandCompletion),再利用command级别的HystrixEventStream来接收数据(HystrixEventStream有不同的子类来处理不同的事件) * 最终消费者通过HystrixEventStream的observe()方法,拿到这个数据源,然后订阅它,从而源源不断的拿到Command发射出的各种数据 ## 谁在最终消费数据? 通过上述步骤,将Hystrix Command执行过程的各种信息转化成了特定数据结构的事件,然后提供了一个Observable作为数据源。如果需要使用这些数据,各观察者只需要订阅Observable就可以拿到这些已经分门别类且结构化的数据了。 例如:断路器就是利用这些信息,然后统计分析数据,最终提供断路器的功能。 本文不深入断路器,仅关注各项事件的收集过程中的数据流向。下一遍文章将分享断路器是如何利用这些基础数据,如何使用滑动窗口的原理来处理数据,感兴趣可以关注奥。 ## 附录 ### HystrixEvent HystrixEvent是一个事件标记接口,其子类都是些特定数据结构的数据对象。像HystrixThreadEventStream会封装这个事件。  ### HystrixEventStream HystrixEventStream各子类提供了write()方法供其他对象写入HystrixEvent,然后再提供observe()方法,供其他消费者来消费这些数据。 
本文由
cyj
创作,可自由转载、引用,但需署名作者且注明文章出处。
文章标题:
Spring Cloud 源码学习之 Hystrix Metrics 收集
文章链接:
https://chenyongjun.vip/articles/89
扫码或搜索 cyjrun 关注微信公众号, 结伴学习, 一起努力