Cluster 模块的目标是暴露 Invoker 对象,实现统一的调用入口
@SPI(FailoverCluster.NAME) //默认扩展点public interface Cluster { @Adaptive//基于 Directory ,创建 Invoker 对象Invoker join(Directory directory) throws RpcException;}复制代码
我们看下类图
从上图可以看到,每一个Cluster实现都对应一个Invoker实现。
那么我们从哪里入手分析呢?
当服务消费者启动时,在获取代理时,会加入集群路由。这里出现 FailoverCluster 。
FailoverCluster
//它实现Cluster。调用失败时自动切换。会切换到其他服务器。但重试会带来延迟,要设定重试次数。通常用于读操作public class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override publicInvoker join(Directory directory) throws RpcException { return new FailoverClusterInvoker (directory);//这里出现了FailoverClusterInvoker }}复制代码
//AbstractClusterInvoker
我们先看FailoverClusterInvoker的基类。
public abstract class AbstractClusterInvokerimplements Invoker { protected final Directory directory; protected final boolean availablecheck; private AtomicBoolean destroyed = new AtomicBoolean(false); private volatile Invoker stickyInvoker = null; public AbstractClusterInvoker(Directory directory) { this(directory, directory.getUrl()); } public AbstractClusterInvoker(Directory directory, URL url) { this.directory = directory; this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK); }}//list获得所有服务提供者 Invoker 集合protected List > list(Invocation invocation) throws RpcException { return directory.list(invocation);} /** 使用 LoadBalance 选择 invoker. LoadBalance 提供负责均衡策略 invokers 候选的 Invoker 集合 selected 已选过的 Invoker 集合. 注意:输入保证不重复 */ //从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象 protected Invoker select(LoadBalance loadbalance, Invocation invocation, List > invokers, List > selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); // 获得 sticky 配置项,方法级 boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, "sticky", false); { // 若 stickyInvoker 不存在于 invokers 中,说明不在候选中,需要置空,重新选择 if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } // 若开启粘滞连接的特性,且 stickyInvoker 不存在于 selected 中,则返回 stickyInvoker 这个 Invoker 对象 if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } }//该方法主要处理粘滞特性,具体选择逻辑在doSelect // 执行选择 Invoker invoker = doSelect(loadbalance, invocation, invokers, selected); // 若开启粘滞连接的特性,记录最终选择的 Invoker 到 stickyInvoker if (sticky) { stickyInvoker = invoker; } return invoker; } //从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象 private Invoker doSelect(LoadBalance loadbalance, Invocation invocation, List > invokers, List > selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; // 1.如果只有一个 Invoker ,直接选择 if (invokers.size() == 1) return invokers.get(0); //2.使用 Loadbalance ,选择一个 Invoker 对象 Invoker invoker = loadbalance.select(invokers, getUrl(), invocation);//10 // 如果 selected中包含 或者 不可用&&availablecheck=true 则重新选择 if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try {//3.重新选一个 Invoker 对象 Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { //看下第一次选的位置,如果不是最后,选+1位置. int index = invokers.indexOf(invoker); try { //最后在避免碰撞 invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; } @Override//调用服务提供者的逻辑 public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed();// 校验是否销毁 // binding attachments into invocation. Map contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 获得所有服务提供者 Invoker 集合 List > invokers = list(invocation);//-->进入6 7 //获得 LoadBalance 对象 LoadBalance loadbalance = initLoadBalance(invokers, invocation);//8 // 设置调用编号,若是异步调用 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // 执行调用(子Cluster的Invoker实现类的服务调用的差异逻辑。) return doInvoke(invocation, invokers, loadbalance); } @Override//调用服务提供者的逻辑 public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed();// 校验是否销毁 // binding attachments into invocation. Map contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 获得所有服务提供者 Invoker 集合 List > invokers = list(invocation);//-->进入6 7 //获得 LoadBalance 对象 LoadBalance loadbalance = initLoadBalance(invokers, invocation);//8 // 设置调用编号,若是异步调用 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // 执行调用(子Cluster的Invoker实现类的服务调用的差异逻辑。) return doInvoke(invocation, invokers, loadbalance); }复制代码
FailoverClusterInvoker
public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { List > copyinvokers = invokers; checkInvokers(copyinvokers, invocation);// 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常 String methodName = RpcUtils.getMethodName(invocation); // 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2 int len = getUrl().getMethodParameter(methodName, "retries", 2) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // 保存最后一次调用的异常 // 保存已经调用过的Invoker List > invoked = new ArrayList >(copyinvokers.size()); // invoked invokers. Set providers = new HashSet (len); // failover机制核心实现:如果出现调用失败,那么重试其他服务器 for (int i = 0; i < len; i++) { if (i > 0) { //i > 0进行重新选择,避免重试时,候选 Invoker 集合,已发生变化。 checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } // 根据负载均衡机制从copyinvokers中选择一个Invoker Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);//9 // 保存每次调用的Invoker invoked.add(invoker); // 设置已经调用的 Invoker 集合,到 Context 中 RpcContext.getContext().setInvokers((List) invoked); try {// RPC 调用得到 Result Result result = invoker.invoke(invocation);//- // 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出 if (le != null && logger.isWarnEnabled()) {//le 非空,说明此时是重试调用成功 logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // 如果是业务性质的异常,不再重试,直接抛出. throw e; } le = e;// 其他性质的异常统一封装成RpcException } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally {//保存已经调用的网络地址集合。 providers.add(invoker.getUrl().getAddress()); } } // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息 throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); }复制代码
总结的时序图