首页 > 技术文章 > dubbo系列四、dubbo集群容错和负载均衡

zhangyjblogs 2021-07-28 23:46 原文

dubbo集群容错和负载均衡

1.前言

在微服务环境中,为了保证服务的高可用,很少会有单点服务出现,服务通常都是以集群的形式出现。在dubbo远程调用过程中,被调用的远程服务并不是每时刻都保持良好的状态,当某个服务调用出现异常时候(比如网络抖动、服务短暂不可用),都需要进行容错,就需要到了集群容错机制。

dubbo的cluster层包含Cluster、Directory、Router、LoadBalance几大核心接口。注意需要区分Cluster层和Cluster接口,Cluster层是抽象概念,表示的是对外的整个集群容错层;Cluster是容错接口,提供FailOver、FailFast等容错机制。

在dubbo请求过程中,通过methodName从注册表RegistryDirectory获取Invoker集合,接着使用路由对Invoker进行过滤,接着使用负载均衡从过滤后的Invoker集合获取一个Invoker,对此Invoker进行执行调用。下图来自官网

image-20210714003620760

在这个过程中,用到了集群容错机制、负载均衡、路由规则,本篇文章记录下集群和负载均衡。

2.集群容错生成

我们先看下dubbo调用使用,consumer端的Invoker的结构:MockClusterWrapper->FailoverClusterInvoker->DubboInvoker,其中MockClusterWrapper和FailoverClusterInvoker都是Cluster,MockClusterWrapper是个wrapper类(用于功能增强),忽略。那么实际就是通过FailoverClusterInvoker进行集群容错的。那么FailoverClusterInvoker是如何生成的呢?

答案在com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL)方法的Invoker invoker = cluster.join(directory);,

此cluster是自适应对象Cluster$Adaptive,根据directory.getUrl()RegistryDirectory.overrideDirectoryUrl上的cluster参数获取对应的XxxCluster,然后创建对应的XxxClusterInvoker,默认是failover。那么RegistryDirectory.overrideDirectoryUrl是如何生成的呢?是在com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(List<URL>)操作内,overrideDirectoryUrl是个zk协议,里面的参数是由consumerUrl和动态配置内的override协议上的参数merge而来。因此要指定cluster有3种途径:1.在provider进行配置,比如dubbo.provider.cluster=failfast或dubbo.service.cluster=failfast;2.在consumer进行配置dubbo.consumer.cluster=failfast或dubbo.reference.cluster=failfast,consumer端配置会覆盖provider配置;3.通过服务治理平台(比如dubbo-admin)进行动态配置集群。三种配置优先级依次增加。

3.集群容错介绍

3.1.集群容错介绍

dubbo容错机制能增强整个应用的鲁棒性,容错过程对上层用户是完全透明的,但用户可以通过不同的配置项来选择不同的容错机制。每种容错机制又有自己个性化的配置项。列举如下:

策略名称 优点 缺点 主要应用场景
Failover 对调用者屏蔽调用失败的信息 增加RT,额外资源开销,资源浪费 对调用rt不敏感的场景
Failfast 业务快速感知失败状态进行自主决策 产生较多报错的信息 非幂等性操作,需要快速感知失败的场景
Failsafe 即使失败了也不会影响核心流程 对于失败的信息不敏感,需要额外的监控 旁路系统,失败不影响核心流程正确性的场景
Failback 失败自动异步重试 重试任务可能堆积 对于实时性要求不高,且不需要返回值的一些异步操作
Forking 并行发起多个调用,降低失败概率 消耗额外的机器资源,需要确保操作幂等性 资源充足,且对于失败的容忍度较低,实时性要求高的场景
Broadcast 支持对所有的服务提供者进行操作 资源消耗很大 通知所有提供者更新缓存或日志等本地资源信息

3.2.默认的集群容错分析

集群策略默认是failover,平时工作中使用的基本也是这个策略。下面分析下这个

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }

}

代码很简单,生成一个Invoker对象FailoverClusterInvoker,容错逻辑都在FailoverClusterInvoker,下面接着看

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;//最大调用次数,即重试次数+1
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. //保存已经调用的Invoker,如果调用失败,下次则不再选取到已经调用到的Invoker
    Set<String> providers = new HashSet<String>(len);//保存已经调用的Invoker地址,用于日志打印
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();//调用过程可能consumer被关闭,做下检查
            copyinvokers = list(invocation);//重新选取,可能服务发生了变化
            // check again
            checkInvokers(copyinvokers, invocation);
        }
        Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);//负载均衡选取一个Invoker 
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);//rpc调用
            if (le != null && logger.isWarnEnabled()) {
                logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                           + invocation.getMethodName() + " in the service " + getInterface().getName()
                           + ". Tried " + len + " times of the providers " + providers
                           + " (" + providers.size() + "/" + copyinvokers.size()
                           + ") from the registry " + directory.getUrl().getAddress()
                           + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                           + Version.getVersion() + ". Last error is: "
                           + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}

failover默认是三次调用

第一次调用:使用invoked集合保存本次调用的Invoker,用于调用失败,下次调用避免再次选取到上次失败的Invoker。providers保存本次调用的地址,用于日志打印。第一次调用成功,则直接返回结果。调用不成功,使用进入第二次调用。

第二次调用:使用list()操作重新获取可用的服务Invoker集合,然后根据负载均衡重新选取一个Invoker,只是会剔除到第一次失败的Invoker。把第二次调用的Invoker保存到调用集合invoked,然后进行rpc调用,调用成功,返回结果(并打印warn日志,提示第一次失败的服务地址给用户),调用失败,进入第三次调用。

第三次调用,使用list()操作重新获取可用的服务Invoker集合,然后根据负载均衡重新选取一个Invoker,只是会剔除到第一次和第二次失败的Invoker。把第三次调用的Invoker保存到调用集合invoked,然后进行rpc调用,调用成功,返回结果,调用失败,则抛出异常给用户。

过程中,核心就是invoked这个集合,用于保存已经调用的Invoker,在调用失败重试时候,负载均衡选取Invoker就会剔除已经调用的服务。

3.3.粘滞连接策略分析

如果配置了dubbo.reference.sticky=true 或者 方法上被注解@Method(sticky=true),则启用了粘滞连接,粘滞连接优先级高于负载均衡策略。代码分析如下

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    String methodName = invocation == null ? "" : invocation.getMethodName();

    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);//粘滞连接获取
    {
        //ignore overloaded method
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {//服务列表不包含stickyInvoker,说明原调用的stickyInvoker已经失效,因此置null
            stickyInvoker = null;
        }
        //ignore concurrency problem
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {//selected == null 说明没发起调用,!selected.contains(stickyInvoker)即已经调用的invoker集合不包含这个黏性Invoker 
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
    }
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);//负载均衡选取一个Invoker

    if (sticky) {//首次调用,更新stickyInvoker为本次调用的invoker
        stickyInvoker = invoker;
    }
    return invoker;
}

首次调用:stickyInvoker是个volatile线程安全,首次调用,虽然开启了粘滞,但是stickyInvoker为null,因此使用负载均衡选取一个Invoker保存为stickyInvoker。

后面调用:stickyInvoker已经存在,因此在开启了粘滞后,直接返回stickyInvoker。

动态服务列表发生变化,在调用时候,stickyInvoker已经不存在与动态服务集合,因此给stickyInvoker置null,使用负载均衡重新选取Invoker作为stickyInvoker。

3.4.自定义集群

自定义集群很简单,实现Cluster接口即可,比如MyCluster,然后在resources下创建dubbo/internal/com.alibaba.dubbo.rpc.cluster.Cluster文件,文件内容为my=MyCluster即可,同时定义一个MyClusterInvoker 继承AbstractClusterInvoker,通过配置dubbo.reference.cluster=my即可生效。

4.负载均衡

4.1.负载均衡配置

配置负载均衡有以下几种方式:

provider端配置:全局配置 dubbo.provider.loadbalance="random" 或者 dubbo.service.loadbalance="random";单独一个provider端服务配置@Service(loadbalance="random") ;单独其中一个方法配置@Method(loadbalance="random")

consumer端配置:全局配置 dubbo.consumer.loadbalance="random" 或者 dubbo.reference.loadbalance="random";单独一个consumer端配置@Reference(loadbalance="random") ;单独其中一个方法配置@Method(loadbalance="random")

多个配置是有覆盖关系的, 配置的优先级是:

  1. 客户端方法级别配置;(最优先)

  2. 客户端接口级别配置;

  3. 服务端方法级别配置;

  4. 服务端接口级别配置;(最后使用)

注意: 虽说以上配置有全封闭服务端配置的,有针对客户端配置的,但是,真正使负载均衡起作用的是,客户端在发起调用的时候,使用相应负载均衡算法进行选择调用。(服务端不可能有这能力)

4.2.负载均衡策略

负载均衡接口是LoadBalance,抽象类AbstractLoadBalance是模板模式,有四种负载均衡策略

Random LoadBalance

随机,按权重设置随机概率。是dubbo默认负载均衡

在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

RoundRobin LoadBalance

轮询,按公约后的权重设置轮询比率。

存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

LeastActive LoadBalance

最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

ConsistentHash LoadBalance

一致性 Hash,相同参数的请求总是发到同一提供者。

当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

算法参见:http://en.wikipedia.org/wiki/Consistent_hashing

缺省只对第一个参数 Hash,如果要修改,配置 <dubbo:parameter key="hash.arguments" value="0,1" />

缺省用 160 份虚拟节点,如果要修改,配置 <dubbo:parameter key="hash.nodes" value="320" />

4.3.负载均衡通用逻辑

负载均衡也是使用了模板模式,公共逻辑在AbstractLoadBalance,具体是getWeight和calculateWarmupWeight,其中getWeight我们可以重写。看下这两个公共逻辑方法

protected int getWeight(Invoker<?> invoker, Invocation invocation) {//protected方法,通常让子类用于重写
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);//获取invoker权重,默认是100
    if (weight > 0) {
        long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);//获取启动时间戳,默认0
        if (timestamp > 0L) {
            int uptime = (int) (System.currentTimeMillis() - timestamp);//启动时间
            int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);//获取预热时间,默认10min
            if (uptime > 0 && uptime < warmup) {//启动时间小于预热时间,计算权重
                weight = calculateWarmupWeight(uptime, warmup, weight);
            }
        }
    }
    return weight;
}
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
    int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
    return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

calculateWarmupWeight存在的意义在于,框架考虑到服务刚启动时需要有一个预热过程,如果一启动就给与100%的流量,则可能会导致服务崩溃,因此实现了 calculateWarmupWeight 方法用于计算预热时候的权重,计算逻辑是:(启动至今实际/给予的预热总实际/权重),比如:A服务的权重是5,让它预热10min,则第一分钟的时候,它的权重是1/(10/5)=0.5,0.5/5=0.1,也就是承担10%的流量;10min后,权重变为10/(10/5)=5,5/5=1,权重变为设置的100%,承担了所有流量。

getWeight的逻辑:获取权重,如果启动时间大于10min,则直接返回获取的权重(设置的权重或100);启动时间小于10min,则根据预热时间计算出一个权重返回。

4.4.随机算法

RandomLoadBalance是默认的负载均衡策略,基本使用这个居多,分析下这个源码

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size(); // invoker数量
    int totalWeight = 0; // 权重和
    boolean sameWeight = true; //每个invoker权重是否相同的标识
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);//父类公共逻辑获取权重
        totalWeight += weight; // 每个Invoker权重累加到totalWeight
        if (sameWeight && i > 0
            && weight != getWeight(invokers.get(i - 1), invocation)) {//当前invoker权重和上一个invoker权重不同,则设置sameWeight=false,说明权重不同
            sameWeight = false;
        }
    }
    if (totalWeight > 0 && !sameWeight) {//权重不同
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = random.nextInt(totalWeight);//新版改用了ThreadLocalRandom,性能更好。 offset,根据总权重计算出一个随机的偏移量
        // Return a invoker based on the random value.
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);//权重不同,根据偏移量获取找到对应的invoker
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    return invokers.get(random.nextInt(length));//权重相同,说明所有invoker概率都相同,直接返回一个随机Invoker
}

稍微难理解的就是offset了,以一个例子更容易明白,假设有4个invoker,权重分别是1、2、3、4,则总权重是1+2+3+4=10。说明每个Invoker分别有1/10、2/10、3/10、4/10的概率被选中。然后nextInt(10)会返回0~10之间的一个整数,假设是5.那么进行累减,5-4=1,接着1-3=-2小于0,此时会落入到3的区间,即选中3好的invoker,如下图

image-20210714234224210

4.5.负载均衡扩展

自定义负载均衡,实现LoadBalance接口或者继承AbstractLoadBalance即可,比如MyLoadBalance,然后在resources下创建/dubbo/internal/com.alibaba.dubbo.rpc.cluster.LoadBalance文件,文件内容为my=MyLoadBalance即可通过配置@Reference(loadbalance="my")即可生效。

5.Merger的实现

当一个接口有不同实现时候,消费者又需要同时引用不同的实现时,可以用group来区分不同的实现。如果需要并行调用不同group的服务,并且要把结果集合并起来,则需要用到Merger特性。Merger实现了多个服务调用后结果合并的逻辑。dubbo对于这种直接封装到了框架中,简化了业务开发的复杂度。具体就是MergeableCluster和MergeableClusterInvoker。对于这种需要合并服务端结果的,通常都是业务进行合并,工作中还没用过dubbo的merger,暂时知道有这么个东西,后面有机会用下看看。

6.Mock

mock的集群是MockClusterWrapper,是个wrapper,用于对其它XxxClusterInvoker进行增强的,对应的Invoker是MockClusterInvoker,有两种用途

用途1:用于降级mock,配置了mock=true,则不进行实际调用。

用途2:用于拦截RPCException,只有在拦截到RPCException的时候会开启,属于异常容错的一种。业务层我们可以使用try...catch来实现这种功能,如果下沉到框架中的mock机制,则可以让业务层的实现更优雅。

mock的实现原理:

1.获取invoker上的mock参数,没有配置,则直接进行正常Invoker调用。和mock无关

2.配置了mock,且mock参数是force开头,说明是强制mock,执行doMockInvoke逻辑

3.配置了mock,mock参数非force开头,则进行正常Invoker调用。调用成功,返回业务结果数据。调用失败,执行doMockInvoke逻辑

强制mock和失败后的mock都会调用doMockInvoke逻辑,其步骤如下:

1.通过selectMockInvoker获得所有mock类型的Invoker。selectMockInvoker在对象的attchment属性中放入一个invocation.need.mock=true的标识。directory在list方法中列出所有invoker的时候,如果检测到这个标识,则使用MockInvokerSelector来过滤Invoker,而不是使用普通的route或tagrouter实现。最后返回mock类型的Invoker列表。如果一个mock类型的Invoker都没有返回,则通过directory的url新创建一个MockInvoker;如果有mock类型的Invoker,则使用第一个。

2.调用MockInvoker的invoke方法,在try-catch中调用invoke方法并返回结果。如果出现了异常,且是业务异常,则包装为一个RpcResult返回,否则返回RPCException。

还是需要测试下mock功能,用的少

推荐阅读