自学内容网 自学内容网

SpringCloud第六章(服务保护CircuitBreaker) -2024

目录

1:什么是CircuitBreaker

2:CircuitBreaker的实现Resilience4J

3:Resilience4J的主要模块和架构

3.1:主要模块

3.2:Resilience4J的状态

4:服务熔断(CircuitBreaker) 请求判断成功率

4.1:基于计数器的滑动窗口

4.1.1:导包resilience4j的依赖包

4.1.2:yml文件添加配置

4.1.3:测试目标

 4.1.4:代码验证 

4.1.5:结果验证

4.2:基于时间的滑动窗口 

4.2.1:导包resilience4j的依赖包

4.2.2:yml文件添加配置

4.2.3:测试目标

4.2.3:代码测试

4.2.5:结果验证

5:服务隔离(Bulkhead)单位时间限量不限速

5.1:信号量Bulkhead舱壁实现服务隔离

5.2:线程池Bulkhead舱壁实现服务隔离

6:服务限流(RateLimiter) 单位时间限制速度

6.1:导包限流依赖包

6.2:yml文件添加配置

6.3:代码实现

6.4:结果验证


1:什么是CircuitBreaker

CircuitBreaker是断路器的意思,由于原来的SpringCoud的hystrix停更,所以springcloud社区推出了的新断路器,用来进行springcloud的服务降级、限流、熔断

Spring Cloud Circuit BreakerCircuitBreaker官网:Spring Cloud Circuit Breaker

由于Spring Cloud断路器(CircuitBreaker)提供了不同断路器实现的抽象,

支持的实现有两种Resilience4J和Spring Retry的实现

在Spring Cloud CircuitBreaker中实现的API位于Spring Cloud Commons中。这些API的使用文档位于Spring Cloud Commons文档中。

2:CircuitBreaker的实现Resilience4J

我们主要学习Resilience4J,Resilience4J的官网

GitHub - resilience4j/resilience4j: Resilience4j is a fault tolerance library designed for Java8 and functional programming

Resilience4j是受到Netflix Hystrix的启发,为Java8和函数式编程所设计的轻量级容错框架。整个框架只是使用了Varr的库,不需要引入其他的外部依赖。与此相比,Netflix Hystrix对Archaius具有编译依赖,而Archaius需要更多的外部依赖,例如Guava和Apache Commons Configuration。

Resilience4j提供了提供了一组高阶函数(装饰器),包括断路器,限流器,重试机制,隔离机制。你可以使用其中的一个或多个装饰器对函数式接口,lambda表达式或方法引用进行装饰。这么做的优点是你可以选择所需要的装饰器进行装饰。

在使用Resilience4j的过程中,不需要引入所有的依赖,只引入需要的依赖即可。

以上来自官网,这里不粘贴太多了,详情GitHub官网。

3:Resilience4J的主要模块和架构

3.1:主要模块

主要模块的作用是方便我们根据模块,了解不同的功能实现

resilience4j-circuitbreaker: 熔断

resilience4j-ratelimiter: 限流

resilience4j-bulkhead: 隔离

resilience4j-retry: 自动重试(同步,异步)

resilience4j-cache: 结果缓存

resilience4j-timelimiter: 超时处理

3.2:Resilience4J的状态

Resilience4J的状态装换,便于我们理解限流、降级、熔断的功能实现

断路器有三个普通状态

1:关闭(CLOSED):

服务可以正常访问,所有请求都能接受

2:开启(OPEN):

服务不能访问,当我们设置一些请求按照我们的规则,比如10个请求在滑动窗口下成功率小于50%,也就是大于5个失败,服务进入关闭状态,不能访问。新的请求走fallbackMethod,提示服务繁忙

3:半开(HALFOPEN)

按照我们设置的规则,比如由于10个请求成功率低的原因服务进入open状态,不能访问。

但是过了N秒(我们自己设置)进入半开状态,可以允许指定的请求再次打进来比如只进来2个,不是所有请求,重新计算成功率,这个状态就是半开。

在半开状态下重新计算成功率,成功率达标,则说明服务健康了,服务进入关闭状态,可以大量访问。否则进入open状态,不能访问。再次开启这个循环往复

还有两个特殊状态:禁用(DISABLED)、强制开启(FORCED OPEN)。

4:服务熔断(CircuitBreaker) 请求判断成功率

为什么需要对服务进行熔断降级:

当下游的服务因为某种原因突然变得不可⽤或响应过慢,上游服务会一直占用线程资源,服务变得不可用。上游服务为了保证⾃⼰整体服务的可⽤性,不再继续调⽤⽬标服务,直接返回,快速释放资源。如果⽬标服务情况好转则恢复调⽤。熔断器模型,如图所示

4.1:基于计数器的滑动窗口

4.1.1:导包resilience4j的依赖包

  <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2023.0.3</spring-cloud.version>
    </properties>
  
<!--circuitbreaker 断路器的resilience4j 实现依赖aop-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>


 <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

4.1.2:yml文件添加配置

#1:按照请求次数失败率的滑动窗口
resilience4j:
  timelimiter: #这里很重要 默认请求远程限制是1S,1S没有返回值就报错,服务降级
    configs:
      default: #这里是默认值
        timeout-duration: 10s #默认请求远程限制是1S,1S没有返回值就报错,服务降级
          #seconds: 10 这里配置不生效
  circuitbreaker:
    configs:
      default: #default配置
        failure-rate-threshold: 50 #故障阈值 超过50%s失败触发断路器状态从 close->open
        sliding-window-type: COUNT_BASED #滑动窗口类型 按照计数器统计 TIME_BASED时间统计
        sliding-window-size: 6 #滑动窗口大小 6个表示6个请求, TIME_BASED的话是6秒
        minimum-number-of-calls: 6 #最小通话次数 表示滑动窗口的统计样本数是6个,最少6个样本计算失败率
        automatic-transition-from-open-to-half-open-enabled: true #启用从open到half-open的自动转换 默认true
        wait-duration-in-open-state: #从open到half-open状态下的等待时间 等待5秒
          seconds: 5
        permitted-number-of-calls-in-half-open-state: 2 #half-open状态下允许的通话次数,
        record-exceptions:
          - java.lang.Exception
    instances:
      PayService: #PayService实例来使用default配置
        base-config: default

4.1.3:测试目标

在6次访问中,失败达到50%,状态从close(请求可以访问)到open(禁止新的请求访问)。

然后经过等待时间5秒后,断路器状态从open过度到half-oepn(半开状态),允许一些自己设置的

请求作为测试从新计算成功率,成功路达标,则状态回复为close状态,否则状态继续是open(禁访问)。从新开始新的循环

 4.1.4:代码验证 

这个服务端口是8080

    @GetMapping(value = "/consumer/pay/circuit/{id}")
    @CircuitBreaker(name = "PayService", fallbackMethod = "myFallback")
    public String getDemo(@PathVariable(value = "id") Integer id) {
        String s;
        if(id==2){
            int a=10/0;
        }
        System.out.println("Order断路器开始:" + DateUtil.now()+":"+Thread.currentThread().getName());
        //feign接口调用consul的外部微服务服务
        s = payFeignApi.getCircuitBreaker(id);

        System.out.println("Order断路器结束:" + DateUtil.now()+":"+Thread.currentThread().getName());
        return s;
    }



  //feign接口调用consul的外部微服务服务
//PayController接口的方法和服务端方法名字一样
@FeignClient(value = "PayService") //value的名字是consul中注册的服务名字
public interface PayFeignApi {

    @GetMapping(value = "/pay/circuit/{id}")
    String getCircuitBreaker(@PathVariable(value = "id") int id);

}


//==============分割线:上边的代码是8080,调用8091的服务 ====================

//==============分割线:下边的代码是8091 ====================
    @GetMapping(value = "/pay/circuit/{id}")
    public String get(@PathVariable(value = "id") int id){
        System.out.println("Pay断路器开始:"+ DateUtil.now());

        if (id==-1){
            throw new RuntimeException("id不能为-1");
        }
        if (id==999){
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("Pay断路器异常:"+ DateUtil.now());
                //throw new RuntimeException(e);
            }
        }
        System.out.println("Pay断路器结束:"+ DateUtil.now());
        return "Pay断路器测试:"+id+"\t"+ IdUtil.simpleUUID();
    }
}

4.1.5:结果验证

但是一旦访问数量大于50%,就会进入熔断状态

4.2:基于时间的滑动窗口 

4.2.1:导包resilience4j的依赖包

跟计数器的导包一样

4.2.2:yml文件添加配置

##2:按照时间窗口失败率的滑动窗口
resilience4j:
  timelimiter:
    configs:
      default: #default配置
        timeout-duration: 7S #默认请求远程限制是1S,1S没有返回值就报错,服务降级
  circuitbreaker:
    configs:
      default: #default配置
       failure-rate-threshold: 50 #故障阈值 超过50%s失败触发断路器状态从 close->open
       slow-call-duration-threshold: 2S
         #seconds: 2 #慢调用时间阈值,大于2秒就是慢调用,增加慢调用统计
       slow-call-rate-threshold: 30 #慢调用比例30% 超过30% 服务降级
       sliding-window-type: TIME_BASED #滑动窗口类型 TIME_BASED时间统计
       sliding-window-size: 2 #滑动窗口大小
       minimum-number-of-calls: 2 #计算失败率和慢调用的最小样本
       permitted-number-of-calls-in-half-open-state: 2 #进入half-open状态 后续状态转换需要的请求数量
       wait-duration-in-open-state: 5S
         #seconds: 5 #从open到半开的等待时间 之后才能再次发送请求
       record-exceptions: #异常类型
         - java.lang.Exception
    instances:
      PayService: #consul的实例名字
        base-config: default #使用的断路器类型

4.2.3:测试目标

在多次访问中,失败达到100%,全部都是请求id=999的请求,8080调用8091,8091服务睡眠10秒

满足了设置的最慢时间2秒,都是慢请求

状态从close(请求可以访问)到open(禁止新的请求访问)。

然后经过等待时间5秒后,断路器状态从open过度到half-oepn(半开状态),允许一些自己设置的

请求作为测试从新计算成功率,成功路达标,则状态回复为close状态,否则状态继续是open(禁访问)。从新开始新的循环

4.2.3:代码测试

跟基于技术的代码一致这里不复制了

4.2.5:结果验证

5:服务隔离(Bulkhead)单位时间限量不限速

为什么需要服务隔离?

服务隔离是指通过技术手段,将系统中的不同服务(如数据库服务、外部API服务、缓存服务等)在逻辑上或物理上进行分离,以避免某个服务的故障或性能问题影响其他服务。服务隔离的主要目标是减少系统中的单点故障,提高系统的可用性和稳定性。主要是限制请求数量

Resilience4j提供了两种隔离的实现方式,可以限制并发执行的数量。

        SemaphoreBulkhead使用了信号量

        FixedThreadPoolBulkhead使用了有界队列和固定大小线程池

5.1:信号量Bulkhead舱壁实现服务隔离

信号量Bulkhead舱壁实现服务隔离,底层依赖juc的信号量

1:导入pom

        <!-- 舱壁的依赖包  -->
        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-bulkhead</artifactId>
        </dependency>

2:增加配置

#2.1:服务隔离的实现舱壁 基于juc的信号量
resilience4j:
  timelimiter:
    configs:
      default:
        timeout-duration: 10S #默认请求远程限制是1S,1S没有返回值就报错。这里设置10S,因为调用的远程服务8091会睡眠5s
  bulkhead:
    configs:
      default: #默认配置
        max-concurrent-calls: 2 #最大信号量是2,当服务超过请求超过2的时候,其他的服务直接开始等待
        max-wait-duration: 2s #信号量占满的时候 只愿意等待2S,得不到信号量直接走回调函数
    instances:
      backendA: #自定义名字 服务A 也就是本案例使用的服务名字
        base-config: default
      backendB: #自定义名字 服务B
        max-concurrent-calls: 10 
        max-wait-duration: 2s

3:代码实现

 /**
     * 舱壁隔离测试(信号量、线程池) 8080服务调用consul的8091
     *
     * @Bulkhead(
     * name = "PayService", 配置文件的服务名字
     * fallbackMethod = "myFallback1", 回调方法
     * 
     * type = Bulkhead.Type.SEMAPHORE) 信号量
     * type = Bulkhead.Type.THREADPOOL) 线程池
     * @return
     */
    @GetMapping(value = "/consumer/pay/bulkhead/{id}")
    @Bulkhead(name = "backendA",fallbackMethod = "myFallback1",type = Bulkhead.Type.SEMAPHORE)
    public String getDemo1(@PathVariable(value = "id") Integer id) {
        String s;
        if(id==2){
            int a=10/0;
        }
        System.out.println("Order舱壁开始:" + DateUtil.now()+":"+Thread.currentThread().getName());
        //feign调用consul的外部微服务服务 当id等于999的时候 远程服务睡眠5s
        s = payFeignApi.getCircuitBreaker(id);
        System.out.println("Order舱壁结束:" + DateUtil.now()+":"+Thread.currentThread().getName());
        return s;
    }


    //舱壁隔离的兜底方法
    public String myFallback1(@PathVariable(value = "id") Integer id, Exception e) {
        System.out.println("执行myFallback1"+ DateUtil.now());
        System.out.println("Order舱壁结束myFallback:" + DateUtil.now());
        return "系统繁忙,稍后再试!" + id + "/" + e.getMessage();
    }



//==============分割线:上边的代码是8080,调用8091的服务 ====================

//==============分割线:下边的代码是8091 ====================
    @GetMapping(value = "/pay/circuit/{id}")
    public String get(@PathVariable(value = "id") int id){
        System.out.println("Pay断路器开始:"+ DateUtil.now());

        if (id==-1){
            throw new RuntimeException("id不能为-1");
        }
        if (id==999){
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("Pay断路器异常:"+ DateUtil.now());
                //throw new RuntimeException(e);
            }
        }
        System.out.println("Pay断路器结束:"+ DateUtil.now());
        return "Pay断路器测试:"+id+"\t"+ IdUtil.simpleUUID();
    }
}



4:效果展示

5.2:线程池Bulkhead舱壁实现服务隔离

线程池是一个固定线程+有界队列,请求先申请固定的线程池中的线程,获取不到排队进入有界等待队列,队列也占满了直接报错。

1:导入pom 跟5.1一致

2:增加配置

#2.2:服务隔离的实现舱壁 基于线程池
resilience4j:
  timelimiter:
    configs:
      default:
        timeout-duration: 10S #resilience4j默认请求远程限制是1S,1S没有返回值就报错。这里设置10S,因为调用的远程服务8091会睡眠5s
  thread-pool-bulkhead:
    configs:
      default: #默认配置
        core-thread-pool-size: 2 #初始化线城池
        max-thread-pool-size: 2 #线程池最大
        queue-capacity: 1 #等待队列长度1
    instances:
      backendB: #自定义名字 服务A 也就是本案例使用的服务名字,使用了default配置
        base-config: default

3:代码实现 

 /**
     * 舱壁隔离测试(线程池) 8080服务 异步调用
     *
     * @Bulkhead(
     * name = "backendB", 配置文件的服务名字
     * fallbackMethod = "myFallback2", 回调方法
     *
     * type = Bulkhead.Type.SEMAPHORE) 信号量
     * type = Bulkhead.Type.THREADPOOL) 线程池
     * @return
     */
    @GetMapping(value = "/consumer/pay/bulkhead1/{id}")
    @Bulkhead(name = "backendB",fallbackMethod = "myFallback2",type = Bulkhead.Type.THREADPOOL)
    public CompletableFuture<String> getDemo2(@PathVariable(value = "id") Integer id) {
        System.out.println("Order舱壁开始:" + DateUtil.now()+":"+Thread.currentThread().getName());
        //feign调用consul的外部微服务服务 当id等于999的时候 远程服务睡眠5s
        //s = payFeignApi.getCircuitBreaker(id);
        final var stringCompletableFuture = CompletableFuture.supplyAsync(
                () -> payFeignApi.getCircuitBreaker(id)
        );
        System.out.println("Order舱壁结束:" + DateUtil.now()+":"+Thread.currentThread().getName());

        return stringCompletableFuture;
    }

    //舱壁隔离的兜底方法
    public CompletableFuture<String> myFallback2(@PathVariable(value = "id") Integer id, Exception e) {
        System.out.println("执行myFallback1"+ DateUtil.now());
        System.out.println("Order舱壁结束myFallback:" + DateUtil.now());
        return CompletableFuture.supplyAsync(
                ()->"系统繁忙,稍后再试!" + id + "/" + e.getMessage()
        );
    }




//==============分割线:上边的代码是8080,调用8091的服务 ====================

//==============分割线:下边的代码是8091 ====================
    @GetMapping(value = "/pay/circuit/{id}")
    public String get(@PathVariable(value = "id") int id){
        System.out.println("Pay断路器开始:"+ DateUtil.now());

        if (id==-1){
            throw new RuntimeException("id不能为-1");
        }
        if (id==999){
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("Pay断路器异常:"+ DateUtil.now());
                //throw new RuntimeException(e);
            }
        }
        System.out.println("Pay断路器结束:"+ DateUtil.now());
        return "Pay断路器测试:"+id+"\t"+ IdUtil.simpleUUID();
    }
}

4:效果展示

三个请求调用远程的8091,8091服务睡眠5秒,1秒之内导致线程池2和队列1占满,新的请求走了回调函数,起到了限制单位时间限制请求数量的要求

6:服务限流(RateLimiter) 单位时间限制速度

什么是限流:限流是一种必不可少的技术,可以帮助您的API进行扩展,并建立服务的高可用性和可靠性。但是,这项技术还附带了一堆不同的选项,比如如何处理检测到的多余流量,或者您希望限制什么类型的请求。有好几种算法可以实现,比如令牌桶、漏斗算法等,主要就是控制请求以一定的速度进入方法。

漏斗算法的缺点:水桶可以设置存放尽可能多的请求,但是漏斗过滤后,请求是一个一个的发送

令牌桶算法如下:

默认使用滑动窗口算法

6.1:导包限流依赖包

  <!-- 速率限制器的依赖包 用于服务限流  -->
        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-ratelimiter</artifactId>
        </dependency>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

6.2:yml文件添加配置

#3:服务限流的实现 基于令牌桶
resilience4j:
  timelimiter:
    configs:
      default:
        timeout-duration: 10S #resilience4j默认请求远程限制是1S,1S没有返回值就报错。这里设置10S,因为调用的远程服务8091会睡眠5s
  ratelimiter:
    configs:
      default:
        limit-for-period: 2 #在一次刷新周期内,允许执行的最大请求数 默认50
        limit-refresh-period: 1s # 限流器每隔limitRefreshPeriod刷新一次,将允许处理的最大请求数量重置为limitForPeriod。
        timeout-duration: 1 #线程等待权限的默认等待时间5S
    instances:
      ratelimiterA: #自定义名字 ratelimiterA 也就是本案例使用的服务名字,使用了default配置
        base-config: default

6.3:代码实现

 /**
     * 限流(滑动窗口) 8080服务 调用8019,被限流后 不会进去getRateLimiter方法,直接走毁掉
     * 注解: @RateLimiter(name = "ratelimiterA",fallbackMethod = "myFallback3")
     */
    @GetMapping(value = "/consumer/pay/rateLimiter/{id}")
    @RateLimiter(name = "ratelimiterA",fallbackMethod = "myFallback3")
    public String getRateLimiter(@PathVariable(value = "id") Integer id) {
        System.out.println("Order限流开始:" + DateUtil.now()+":"+Thread.currentThread().getName());
        //feign调用consul的外部微服务服务 当id等于999的时候 远程服务睡眠5s
        if(id==2){
            int a=10/0;//验证就是错误了也能继续访问 只要速度不错过
        }
        String s = payFeignApi.getCircuitBreaker(id);
        System.out.println("Order限流结束:" + DateUtil.now()+":"+Thread.currentThread().getName());
        return s;
    }

    //舱壁隔离的兜底方法
    public String myFallback3(@PathVariable(value = "id") Integer id, Throwable e) {
        System.out.println("执行myFallback3"+ DateUtil.now());
        System.out.println("Order限流结束myFallback3:" + DateUtil.now());
        return "你被限流了,稍后再试!" + id + "/" + e.getMessage();
    }

6.4:结果验证


原文地址:https://blog.csdn.net/huyiju/article/details/142499919

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!