陈同学
微服务
Accelerator
About
# Spring Cloud 源码学习之 Ribbon 本文学习了Ribbon的设计及源码的主体内容,对具体的负载均衡策略未进行拓展学习。 基于 Spring Boot **2.0.6.RELEASE**、Spring Cloud **Finchley.SR1** ## 简述 在 Spring Cloud 中,Ribbon 作为客户端负载均衡组件。 负载均衡(Load Balance) ,负载指承载的流量,均衡指以一定的策略将流量转发到处理节点,通过负载均衡的方式以提高整体的处理能力。 下面是 nginx 负载均衡配置的例子,假设将 api.example.com 的流量转发到本机8080、8081、8082端口的程序,算法为 **ip_hash**。 ```nginx upstream appserver { ip_hash; server 127.0.0.1:8080; server 127.0.0.1:8081; server 127.0.0.1:8082; } server { listen 80; server_name api.example.com; location / { proxy_pass http://appserver; } } ``` nginx 的例子可以知晓负载均衡的主要组成: * 服务提供方(server) * 负责均衡策略(ip_hash) * 负载均衡能力提供者(nginx,将流量根据策略转发到server) Ribbon也类似,对应的对象分别为: * Server:服务提供方 * IRule:负载均衡规则、策略 * LoadBalancer:负载均衡器,组织整个负载均衡的过程 当然,整个负载均衡过程还会有一些工具性质的组件,例如: * ServerListUpdater: 用于更新server列表,起到服务发现的作用 * LoadBalancerStats:收集转发流量时的处理情况,为负载均衡策略提供数据支撑 ## 核心组成 先介绍其组成部分。 ### Server server 即处理节点,不过多介绍。 ```java public class Server { public static final String UNKNOWN_ZONE = "UNKNOWN"; private String host; private int port = 80; private String scheme; private volatile String id; private volatile boolean isAliveFlag; private String zone = UNKNOWN_ZONE; private volatile boolean readyToServe = true; } ``` ### 负载均衡策略 Ribbon 使用 IRule 接口描述了LB策略的行为,含选择server、get/setLB的能力。 ```java public interface IRule{ // 选择一个 Server, 如访问/api/user/users, key为user, 返回 host:port 的server public Server choose(Object key); // 设置与获取 LoadBalancer public void setLoadBalancer(ILoadBalancer lb); public ILoadBalancer getLoadBalancer(); } ``` 下面是LB策略类图。  著名的策略如:RoundRobinRule(轮询)、WeightedResponseTimeRule(基于响应时间)。 下面稍微简述几种策略: * RandomRule(随机策略):随机选择server,不推荐使用,同时Ribbon对该策略的实现也比较差 * RoundRobinRule(轮询策略):轮询优点是可将流量均衡的转发到server * WeightedResponseTimeRule(响应时间作权重策略):根据收集到的响应时间作为权重,响应时间越短被选中的概率越大 * BestAvailableRule(最佳可用策略):选择当前并发请求最低的server,且会跳过熔断中的server 各策略的源码分析可参考 [程序员DD:负载均衡策略源码分析](https://www.jianshu.com/p/e89c693f19d2) IRule 的抽象实现类 AbstractLoadBalancerRule 持有了 ILoadBalancer 即负载均衡器。 ```java public abstract class AbstractLoadBalancerRule implements IRule { private ILoadBalancer lb; } ``` ### 负载均衡器 Ribbon 使用 ILoadBalancer 定义负载均衡器的行为,LoadBalancer 负责组织整个负载均衡过程,自然需要 **管理server、根据Rule选择Server、转发请求**。 ```java public interface ILoadBalancer { // 维护servers public void addServers(List<Server> newServers); // 利用负载均衡策略选择server public Server chooseServer(Object key); // 下线server public void markServerDown(Server server); // 查询可用的servers public List<Server> getReachableServers(); // 查询所有servers public List<Server> getAllServers(); } ``` 先看看LB的类图。  #### AbstractLoadBalancer 作为所有LB的抽象父类,对ILoadBalancer稍微做了拓展,如定义了一个ServerGroup、提供根据ServerGroup获取ServerList的能力,更为重要的是提供获取LB统计数据(Stats表示Statistics即统计数据)的能力,这些统计数据将作为IRule的支撑。 ```java public abstract class AbstractLoadBalancer implements ILoadBalancer { public enum ServerGroup{ ALL, STATUS_UP, STATUS_NOT_UP} public Server chooseServer() { return chooseServer(null); } public abstract List<Server> getServerList(ServerGroup serverGroup); public abstract LoadBalancerStats getLoadBalancerStats(); } ``` #### BaseLoadBalancer LB的基础实现类,它的成员变量非常重要,是负载均衡能力的支撑。 ```java public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware { // 默认策略为 "轮询" private final static IRule DEFAULT_RULE = new RoundRobinRule(); protected IRule rule = DEFAULT_RULE; // 利用ping来检测server活性 protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY; protected IPing ping = null; // 存储所有server和UP的server protected volatile List<Server> allServerList = ... protected volatile List<Server> upServerList = ... // 获取LB的统计数据 protected LoadBalancerStats lbStats; // 负载均衡行为的各类配置, 如:readTimeout private IClientConfig config; } ``` 在创建LoadBalancer时,需要给它提供以上能力以完成负载均衡。下面看看两个实际的LB子类。 #### DynamicServerListLoadBalancer DynamicServerList 顾名思义,提供了动态获取server的能力,支持在运行时变更serverlist. 没有Zone概念,所有instances将当作一个Zone,如果发生跨区域访问,会有性能问题。例如:上海、广州两个区域的实例都在注册中心上,如果上海的实例RPC访问广州的实例,相对直接访问上海的实例肯定有一定的网络传输时间消耗。 由于提供了Dynamic能力,需要 ServerListFilter、ServerListUpdater 来辅助过滤、更新serverlist。 ServerListUpdater 有两个子类: - PollingServerListUpdater:通过定时任务的方式进行服务列表的更新 - EurekaNotificationServerListUpdater:利用Eureka的事件监听器来驱动服务列表的更新操作 #### ZoneAwareLoadBalancer > 以下内容翻译自类的注释,未做任何实践。 LB在choose server 时可以以Zone(区域)为单位,而衡量一个Zone情况的关键指标是平均活跃请求数(Average Active Requests,AAR),指标通过每个Zone中各rest client数据聚合而成,具体的计算方式是:Zone中总请求数/可用instance数量(不包含熔断的instance),这种计算方式对于Zone中发生超时时非常有效。 LoadBalancer将计算和检查所有可用区的统计信息,如果某Zone的AAR达到配置的阈值,Zone将从active server list 中剔除。一旦超过一个Zone达到阈值,AAR值最大的Zone将被剔除。 ## 组织转发请求 上面介绍了一些概念,接下来看看LB具体如何提供负载均衡能力。 ### 请求执行客户端 请求的执行最终会落到各类HTTP客户端上,先看看Ribbon中提供的客户端。  IClient 接口仅定义了 execute() 接口,IClientConfig 作为配置信息以控制执行行为。 ```java public interface IClient<S extends ClientRequest, T extends IResponse> { public T execute(S request, IClientConfig requestConfig) throws Exception; } ``` LoadBalancerContext 提供LB的上下文,其本身便持有 ILoadBalancer 属性。 ```java public class LoadBalancerContext implements IClientConfigAware { private ILoadBalancer lb; } ``` 而 AbstractLoadBalancerAwareClient 名字可拆为两部分,一是LoadBalancerAware,二是Client,即具备负载均衡器的Client,它负责将LoadBalancer与Client进行结合。 其executeWithLoadBalancer() 先将请求包装成LoadBalancerCommand结合,然后执行请求、等待结果(**这里包含了通过策略选取server**)。 ```java public abstract class AbstractLoadBalancerAwareClient extends LoadBalancerContext implements IClient<S, T>, IClientConfigAware { public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { // 包装成Command, 会通过 LoadBalancer来获取Server LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { // 返回一个 Observable return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { // 调用具体Client的execute()方法 return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } }) // 同步获取结果 .toBlocking() .single(); } } } ``` RibbonLoadBalancingHttpClient、RetryableRibbonLoadBalancingHttpClient、OkHttpLoadBalancingClient、RetryableOkHttpLoadBalancingClient都是作为具体的实现类。 以RibbonLoadBalancingHttpClient为例,有两个重要方法: ```java public class RibbonLoadBalancingHttpClient extends AbstractLoadBalancingClient<RibbonApacheHttpRequest, RibbonApacheHttpResponse, CloseableHttpClient> { // createDelegate() 提供具体执行类,这里是Apache HttpClient 的 CloseableHttpClient protected CloseableHttpClient createDelegate(IClientConfig config) { RibbonProperties ribbon = RibbonProperties.from(config); ... } // execute() 实现了 IClient,并通过delegate来实际执行 @Override public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { IClientConfig config = configOverride != null ? configOverride : this.config; RibbonProperties ribbon = RibbonProperties.from(config); ... // 通过 delegate 执行 final HttpResponse httpResponse = this.delegate.execute(httpUriRequest); return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); } } ``` 通过上面介绍,具体的执行过程就比较清楚了。 * RibbonLoadBalancingHttpClient 这样的具体子类实现IClient的execute()方法,具体执行的客户端委派(delegate)给HttpClient * 上级抽象类 AbstractLoadBalancerAwareClient 提供了executeWithLoadBalancer()能力,实际上是将任务包裹成Command来执行,实际执行的还是execute()方法 * LoadBalancerContext 为客户端提供上下文,上下文中持有了ILoadBalancer 自然就具备负载均衡的能力 ### RibbonCommand 上面介绍了执行请求的具体Client,而 excute() 方法包裹在RibbonCommand中。 RibbonCommand和HystrixCommand类似,都拥有 **HystrixExecutable** 能力,而 HystrixExecutable 定义了 execute()、queue()、observer() 几个基础方法。因此,RibbonCommand 也是可以应用在整个Hystrix体系之中。 ```java public interface RibbonCommand extends HystrixExecutable<ClientHttpResponse> { } public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {} public interface HystrixExecutable<R> extends HystrixInvokable<R> { public R execute(); public Future<R> queue(); public Observable<R> observe(); } ``` 下面看一下类图。 左边是工厂类,右边是具体的Command,各Factory可以提供Command.  AbstractRibbonCommand 的抽象父类非常重要,如下: ```java public abstract class AbstractRibbonCommand<LBC extends AbstractLoadBalancerAwareClient<RQ, RS>, RQ extends ClientRequest, RS extends HttpResponse> extends HystrixCommand<ClientHttpResponse> implements RibbonCommand { private static final Log LOGGER = LogFactory.getLog(AbstractRibbonCommand.class); protected final LBC client; protected RibbonCommandContext context; protected IClientConfig config; // 实现HystrixCommand run() @Override protected ClientHttpResponse run() throws Exception { final RequestContext context = RequestContext.getCurrentContext(); RQ request = createRequest(); RS response; // Client 是否支持重试 boolean retryableClient = ... // 根据 retryable 情况分别进入 execute()、executeWithLoadBalancer()入口 if (retryableClient) { response = this.client.execute(request, config); } else { response = this.client.executeWithLoadBalancer(request, config); } context.set("ribbonResponse", response); if (this.isResponseTimedOut()) { if (response != null) { response.close(); } } return new RibbonHttpResponse(response); } } ``` 可以看出,它提供了如下能力: * LBC属性: LBC 是AbstractLoadBalancerAwareClient类型,因此拥有执行请求的client * extend HystrixCommand,而HystrixCommand 对 命令的execute()、queue()、observer()提供了实现 * IClientConfig属性:拥有执行请求的配置信息 * 实现了HystrixCommand的run()方法,作为Command执行入口 HttpClientRibbonCommand、OkHttpRibbonCommand则作为利用第三方组件实现的子类。 **至此,通过层层传递,所有能力都汇聚到了RibbonCommand上面,只要创建一个RibbonCommand,就能完成整个负载均衡的过程**. 整理后如下图:  其中,execute()、executeWithLoadBalancer()都是具备LB能力的执行入口,不过execute()支持重试,而后者直接执行。 ### 源码分析 下面以Zuul转发请求为场景,分享Ribbon的源码处理过程。 #### 入口 Zuul的 **spring.factories** 如下: ```properties org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.zuul.ZuulServerAutoConfiguration,\ org.springframework.cloud.netflix.zuul.ZuulProxyAutoConfiguration ``` ZuulProxyAutoConfiguration中提供了对RibbonCommand的支持,包含了对RestClient、OkHttp、HttpClient的支持。 ```java @Configuration @Import({ RibbonCommandFactoryConfiguration.RestClientRibbonConfiguration.class, RibbonCommandFactoryConfiguration.OkHttpRibbonConfiguration.class, RibbonCommandFactoryConfiguration.HttpClientRibbonConfiguration.class, HttpClientConfiguration.class }) @ConditionalOnBean(ZuulProxyMarkerConfiguration.Marker.class) public class ZuulProxyAutoConfiguration extends ZuulServerAutoConfiguration { } ``` 默认是HttpClient,HttpClientRibbonConfiguration中注入了HttpClientRibbonCommandFactory。 ```java @Configuration @ConditionalOnRibbonHttpClient protected static class HttpClientRibbonConfiguration { @Bean @ConditionalOnMissingBean public RibbonCommandFactory<?> ribbonCommandFactory( SpringClientFactory clientFactory, ZuulProperties zuulProperties) { // 返回工厂类 return new HttpClientRibbonCommandFactory(clientFactory, zuulProperties, zuulFallbackProviders); } } ``` 工厂类用于根据参数创建HttpClientRibbonCommand,有了它就可以完成整个负载均衡的过程。 ```java public class HttpClientRibbonCommandFactory extends AbstractRibbonCommandFactory { @Override public HttpClientRibbonCommand create(final RibbonCommandContext context) { FallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId()); final String serviceId = context.getServiceId(); // 创建Client final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient( serviceId, RibbonLoadBalancingHttpClient.class); // 创建并设置LB, LB拥有IRule client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId)); return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider, clientFactory.getClientConfig(serviceId)); } } ``` #### Zuul 转交请求 下面看看Zuul结合Ribbon转发的过程,Zuul使用RibbonRoutingFilter来进行请求转发,作为整个程序运行的入口。 先看其核心run()方法,构建了RibbonCommandContext(上下文),然后调用了forward. ```java public class RibbonRoutingFilter extends ZuulFilter { // 在应用启动时已注入HttpClientRibbonCommandFactory protected RibbonCommandFactory<?> ribbonCommandFactory; @Override public Object run() { RequestContext context = RequestContext.getCurrentContext(); try { // 构建Ribbon命令上下文(包含请求headers、params、body、serviceId、retryable等属性) RibbonCommandContext commandContext = buildCommandContext(context); ClientHttpResponse response = forward(commandContext); setResponse(response); return response; } } } ``` forward创建了RibbonCommand,然后将请求交给了RibbonCommand. ```java protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception { RibbonCommand command = this.ribbonCommandFactory.create(context); try { ClientHttpResponse response = command.execute(); return response; } } ``` 此时,RibbonRoutingFilter 将等待 Ribbon 完成请求处理。 #### 执行 RibbonCommand 创建RibbonCommand 需要做很多准备工作,如初始化IClient、ILoadBalancer等,如下: ```java @Override public HttpClientRibbonCommand create(final RibbonCommandContext context) { final String serviceId = context.getServiceId(); // 创建Client用于执行请求 final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient( serviceId, RibbonLoadBalancingHttpClient.class); // 封装LoadBalancer,IRule client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId)); return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider, clientFactory.getClientConfig(serviceId)); } ``` command.execute() 实际调用 HystrixCommand.run(),这是个抽象方法,由实现类提供run()的实现。 ```java public abstract class HystrixCommand<R> extends ... { protected abstract R run() throws Exception; } ``` 在Ribbon中,由AbstractRibbonCommand提供了run的实现,将以负载均衡的方式执行请求。 ```java @Override protected ClientHttpResponse run() throws Exception { // 未允许retry时,默认采取 RibbonLoadBalancingHttpClient boolean retryableClient = ... // 两个execute方法都提供LB能力, 作为两个入口 if (retryableClient) { // 由具备retry能力的client执行 response = this.client.execute(request, config); } else { response = this.client.executeWithLoadBalancer(request, config); } return new RibbonHttpResponse(response); } ``` executeWithLoadBalancer()由父类AbstractLoadBalancerAwareClient提供。 ```java public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { // 创建LoadBalancerCommand, 这是一个独立的类, 与Hystrix无关 LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { // 调用命令的submit, 返回一个Observable, 参数是一个ServerOperation return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { // 注意: ServerOperation函数会执行具体client的execute()方法 return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } }) .toBlocking() .single(); } } ``` LoadBalancerCommand.submit()会返回一个Observable,submit的参数是ServerOperation. 它和Java8提供的函数式接口 **Function** 是一样的。接收一个Server参数,返回Observable结果。 ```java public interface ServerOperation<T> extends Func1<Server, Observable<T>> { @Override public Observable<T> call(Server server); } ``` LoadBalancerCommand.submit()函数的实现是LB的核心,需要对RxJava有一定的了解,下面截取一部分代码(丢弃了retry部分)。 ```java public Observable<T> submit(final ServerOperation<T> operation) { // Use the load balancer Observable<T> o = // 使用一定的策略选择Server(下面会独立介绍) (server == null ? selectServer() : Observable.just(server)) // 使用了RxJava能力,通过应用一个函数返回一个新的Observable .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(Server server) { Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { // 一层层的套下来,在拿到server后, 最后调用operation.call() return operation.call(server).doOnEach(...)} }); return o; } }); } ``` 特别注意上面的 **operation.call()**,这是LoadBalancerCommand.submit()提供的参数,会执行具体Client的execute()方法,默认也就是RibbonLoadBalancingHttpClient的execute()方法。 #### selectServer 再来看看选取server的策略,下面这一行。 ```java (server == null ? selectServer() : Observable.just(server)) ``` 需要执行 selectServer() 才能确定具体的server。selectServer() 通过LoadBalancer获取server. ```java private Observable<Server> selectServer() { // 定义了获取server的程序 return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { // 通过LB获取server Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); } ``` loadBalancerContext.getServerFromLoadBalancer的逻辑如下,通过LB chooseServer. ```java public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException { String host = null; ... ILoadBalancer lb = getLoadBalancer(); if (host == null) { if (lb != null){ // 通过负载均衡策略选择server Server svc = lb.chooseServer(loadBalancerKey); if (svc == null){ throw new ClientException(...); } return svc; } else { ... } else { return new Server(host, port); } ``` ## 小结 本文分享了Ribbon的主体处理流程,整个设计层层递进。 * 提供为选择server提供了各类算法 * 提供了多种LoadBalancer来提供负载均衡能力 * 使用IClient来负责实际的执行,提供常见组件(RestClient、OkHttp、HttpClient)默认实现,也可以自行拓展其他组件 * 使用Hystrix来守护整个执行过程 整个设计拓展性非常好。
本文由
cyj
创作,可自由转载、引用,但需署名作者且注明文章出处。
文章标题:
Spring Cloud 源码学习之 Ribbon
文章链接:
https://chenyongjun.vip/articles/122
扫码或搜索 cyjrun 关注微信公众号, 结伴学习, 一起努力