博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo集群之Cluster模块
阅读量:7120 次
发布时间:2019-06-28

本文共 8271 字,大约阅读时间需要 27 分钟。

Cluster 模块的目标是暴露 Invoker 对象,实现统一的调用入口

@SPI(FailoverCluster.NAME) //默认扩展点public interface Cluster {    @Adaptive//基于 Directory ,创建 Invoker 对象    
Invoker
join(Directory
directory) throws RpcException;}复制代码

我们看下类图

从上图可以看到,每一个Cluster实现都对应一个Invoker实现。

那么我们从哪里入手分析呢?

当服务消费者启动时,在获取代理时,会加入集群路由。这里出现 FailoverCluster 。

FailoverCluster

//它实现Cluster。调用失败时自动切换。会切换到其他服务器。但重试会带来延迟,要设定重试次数。通常用于读操作public class FailoverCluster implements Cluster {    public final static String NAME = "failover";    @Override    public 
Invoker
join(Directory
directory) throws RpcException { return new FailoverClusterInvoker
(directory);//这里出现了FailoverClusterInvoker }}复制代码

//AbstractClusterInvoker

我们先看FailoverClusterInvoker的基类。

public abstract class AbstractClusterInvoker
implements Invoker
{ protected final Directory
directory; protected final boolean availablecheck; private AtomicBoolean destroyed = new AtomicBoolean(false); private volatile Invoker
stickyInvoker = null; public AbstractClusterInvoker(Directory
directory) { this(directory, directory.getUrl()); } public AbstractClusterInvoker(Directory
directory, URL url) { this.directory = directory; this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK); }}//list获得所有服务提供者 Invoker 集合protected List
> list(Invocation invocation) throws RpcException { return directory.list(invocation);} /** 使用 LoadBalance 选择 invoker. LoadBalance 提供负责均衡策略 invokers 候选的 Invoker 集合 selected 已选过的 Invoker 集合. 注意:输入保证不重复 */ //从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象 protected Invoker
select(LoadBalance loadbalance, Invocation invocation, List
> invokers, List
> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); // 获得 sticky 配置项,方法级 boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, "sticky", false); { // 若 stickyInvoker 不存在于 invokers 中,说明不在候选中,需要置空,重新选择 if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } // 若开启粘滞连接的特性,且 stickyInvoker 不存在于 selected 中,则返回 stickyInvoker 这个 Invoker 对象 if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } }//该方法主要处理粘滞特性,具体选择逻辑在doSelect // 执行选择 Invoker
invoker = doSelect(loadbalance, invocation, invokers, selected); // 若开启粘滞连接的特性,记录最终选择的 Invoker 到 stickyInvoker if (sticky) { stickyInvoker = invoker; } return invoker; } //从候选的 Invoker 集合,选择一个最终调用的 Invoker 对象 private Invoker
doSelect(LoadBalance loadbalance, Invocation invocation, List
> invokers, List
> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; // 1.如果只有一个 Invoker ,直接选择 if (invokers.size() == 1) return invokers.get(0); //2.使用 Loadbalance ,选择一个 Invoker 对象 Invoker
invoker = loadbalance.select(invokers, getUrl(), invocation);//10 // 如果 selected中包含 或者 不可用&&availablecheck=true 则重新选择 if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try {//3.重新选一个 Invoker 对象 Invoker
rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { //看下第一次选的位置,如果不是最后,选+1位置. int index = invokers.indexOf(invoker); try { //最后在避免碰撞 invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; } @Override//调用服务提供者的逻辑 public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed();// 校验是否销毁 // binding attachments into invocation. Map
contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 获得所有服务提供者 Invoker 集合 List
> invokers = list(invocation);//-->进入6 7 //获得 LoadBalance 对象 LoadBalance loadbalance = initLoadBalance(invokers, invocation);//8 // 设置调用编号,若是异步调用 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // 执行调用(子Cluster的Invoker实现类的服务调用的差异逻辑。) return doInvoke(invocation, invokers, loadbalance); } @Override//调用服务提供者的逻辑 public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed();// 校验是否销毁 // binding attachments into invocation. Map
contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 获得所有服务提供者 Invoker 集合 List
> invokers = list(invocation);//-->进入6 7 //获得 LoadBalance 对象 LoadBalance loadbalance = initLoadBalance(invokers, invocation);//8 // 设置调用编号,若是异步调用 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // 执行调用(子Cluster的Invoker实现类的服务调用的差异逻辑。) return doInvoke(invocation, invokers, loadbalance); }复制代码

FailoverClusterInvoker

public Result doInvoke(Invocation invocation, final List
> invokers, LoadBalance loadbalance) throws RpcException { List
> copyinvokers = invokers; checkInvokers(copyinvokers, invocation);// 检查copyinvokers即可用Invoker集合是否为空,如果为空,那么抛出异常 String methodName = RpcUtils.getMethodName(invocation); // 得到最大可调用次数:最大可重试次数+1,默认最大可重试次数Constants.DEFAULT_RETRIES=2 int len = getUrl().getMethodParameter(methodName, "retries", 2) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // 保存最后一次调用的异常 // 保存已经调用过的Invoker List
> invoked = new ArrayList
>(copyinvokers.size()); // invoked invokers. Set
providers = new HashSet
(len); // failover机制核心实现:如果出现调用失败,那么重试其他服务器 for (int i = 0; i < len; i++) { if (i > 0) { //i > 0进行重新选择,避免重试时,候选 Invoker 集合,已发生变化。 checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } // 根据负载均衡机制从copyinvokers中选择一个Invoker Invoker
invoker = select(loadbalance, invocation, copyinvokers, invoked);//9 // 保存每次调用的Invoker invoked.add(invoker); // 设置已经调用的 Invoker 集合,到 Context 中 RpcContext.getContext().setInvokers((List) invoked); try {// RPC 调用得到 Result Result result = invoker.invoke(invocation);//- // 重试过程中,将最后一次调用的异常信息以 warn 级别日志输出 if (le != null && logger.isWarnEnabled()) {//le 非空,说明此时是重试调用成功 logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // 如果是业务性质的异常,不再重试,直接抛出. throw e; } le = e;// 其他性质的异常统一封装成RpcException } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally {//保存已经调用的网络地址集合。 providers.add(invoker.getUrl().getAddress()); } } // 最大可调用次数用完还得到Result的话,抛出RpcException异常:重试了N次还是失败,并输出最后一次异常信息 throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); }复制代码

总结的时序图

转载于:https://juejin.im/post/5bf2cdea51882579cf011e89

你可能感兴趣的文章
Webform中<%%>
查看>>
一些简单的配置
查看>>
Effective C++ 精要(第五部分:实现)
查看>>
DIV+CSS颜色边框背景等样式
查看>>
HDU 1018 Big Number【斯特林公式/log10 / N!】
查看>>
nefu 115
查看>>
drf版本控制 和django缓存,跨域问题,
查看>>
SVN环境搭建详解(来源网络)
查看>>
设备驱动基础学习--字符驱动实现
查看>>
sourceinsight安装记录
查看>>
PHP函数索引-F
查看>>
数组[]
查看>>
C++学习之基本概念
查看>>
el captain设置环境变量
查看>>
Educational Codeforces Round 37 A B C
查看>>
UVA 129 Krypton Factor(DFS 回溯)
查看>>
MongoDB可视化工具Studio 3T的使用
查看>>
Android卡片设置透明度失效问题
查看>>
Python 利用*args和**kwargs解决函数遇到不确定数量参数问题
查看>>
线段树(知识概念)
查看>>