自学内容网 自学内容网

40分钟学 Go 语言高并发:负载均衡与服务治理

负载均衡与服务治理

一、知识要点总览

模块核心内容技术实现难度
负载策略轮询、权重、最小连接数自定义负载均衡器
服务降级服务降级、熔断降级、限流降级Hystrix模式
熔断机制熔断器状态机、失败计数、自动恢复Circuit Breaker
限流设计令牌桶、滑动窗口、计数器Rate Limiter

让我们开始具体实现:

1. 负载均衡实现

// loadbalancer/balancer.go
package loadbalancer

import (
    "sync"
    "sync/atomic"
    "time"
)

// 服务实例
type Instance struct {
    ID           string
    Host         string
    Port         int
    Weight       int
    Active       bool
    LastActive   time.Time
    Connections  int64  // 当前连接数
    FailCount    int64  // 失败计数
}

// 负载均衡器接口
type LoadBalancer interface {
    Select() (*Instance, error)
    UpdateInstances(instances []*Instance)
    MarkSuccess(instance *Instance)
    MarkFailed(instance *Instance)
}

// 轮询负载均衡器
type RoundRobinBalancer struct {
    instances []*Instance
    counter   uint64
    mu        sync.RWMutex
}

func NewRoundRobinBalancer() *RoundRobinBalancer {
    return &RoundRobinBalancer{
        instances: make([]*Instance, 0),
    }
}

func (b *RoundRobinBalancer) Select() (*Instance, error) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    
    if len(b.instances) == 0 {
        return nil, ErrNoAvailableInstances
    }
    
    // 获取当前计数
    count := atomic.AddUint64(&b.counter, 1)
    index := int(count % uint64(len(b.instances)))
    
    return b.instances[index], nil
}

// 加权轮询负载均衡器
type WeightedRoundRobinBalancer struct {
    instances     []*Instance
    weights       []int
    currentWeight int
    mu           sync.RWMutex
}

func NewWeightedRoundRobinBalancer() *WeightedRoundRobinBalancer {
    return &WeightedRoundRobinBalancer{
        instances: make([]*Instance, 0),
        weights:   make([]int, 0),
    }
}

func (b *WeightedRoundRobinBalancer) Select() (*Instance, error) {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    if len(b.instances) == 0 {
        return nil, ErrNoAvailableInstances
    }
    
    totalWeight := 0
    var best *Instance
    bestWeight := -1
    
    for i, instance := range b.instances {
        if !instance.Active {
            continue
        }
        
        b.weights[i] += instance.Weight
        totalWeight += instance.Weight
        
        if bestWeight < b.weights[i] {
            bestWeight = b.weights[i]
            best = instance
        }
    }
    
    if best == nil {
        return nil, ErrNoAvailableInstances
    }
    
    for i := range b.weights {
        b.weights[i] -= totalWeight
    }
    
    return best, nil
}

// 最小连接数负载均衡器
type LeastConnectionBalancer struct {
    instances []*Instance
    mu        sync.RWMutex
}

func NewLeastConnectionBalancer() *LeastConnectionBalancer {
    return &LeastConnectionBalancer{
        instances: make([]*Instance, 0),
    }
}

func (b *LeastConnectionBalancer) Select() (*Instance, error) {
    b.mu.RLock()
    defer b.mu.RUnlock()
    
    if len(b.instances) == 0 {
        return nil, ErrNoAvailableInstances
    }
    
    var best *Instance
    minConn := int64(^uint64(0) >> 1) // 最大int64值
    
    for _, instance := range b.instances {
        if !instance.Active {
            continue
        }
        
        connections := atomic.LoadInt64(&instance.Connections)
        if connections < minConn {
            minConn = connections
            best = instance
        }
    }
    
    if best == nil {
        return nil, ErrNoAvailableInstances
    }
    
    // 增加连接数
    atomic.AddInt64(&best.Connections, 1)
    return best, nil
}

// 更新实例列表
func (b *LeastConnectionBalancer) UpdateInstances(instances []*Instance) {
    b.mu.Lock()
    defer b.mu.Unlock()
    b.instances = instances
}

// 标记请求成功
func (b *LeastConnectionBalancer) MarkSuccess(instance *Instance) {
    atomic.AddInt64(&instance.Connections, -1)
    atomic.StoreInt64(&instance.FailCount, 0)
    instance.LastActive = time.Now()
}

// 标记请求失败
func (b *LeastConnectionBalancer) MarkFailed(instance *Instance) {
    atomic.AddInt64(&instance.Connections, -1)
    failCount := atomic.AddInt64(&instance.FailCount, 1)
    
    // 如果失败次数过多,标记为不可用
    if failCount >= 3 {
        instance.Active = false
    }
}

2. 服务降级实现

// degradation/degradation.go
package degradation

import (
    "context"
    "sync"
    "time"
)

type DegradationLevel int

const (
    NoDegradation DegradationLevel = iota
    PartialDegradation
    FullDegradation
)

type DegradationRule struct {
    Name           string
    Threshold      float64
    TimeWindow     time.Duration
    Level          DegradationLevel
    RecoveryTime   time.Duration
}

type DegradationManager struct {
    rules       map[string]*DegradationRule
    states      map[string]*DegradationState
    mu          sync.RWMutex
}

type DegradationState struct {
    Level      DegradationLevel
    StartTime  time.Time
    EndTime    time.Time
    Metrics    map[string]float64
}

func NewDegradationManager() *DegradationManager {
    return &DegradationManager{
        rules:  make(map[string]*DegradationRule),
        states: make(map[string]*DegradationState),
    }
}

func (m *DegradationManager) AddRule(rule *DegradationRule) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.rules[rule.Name] = rule
}

func (m *DegradationManager) CheckDegradation(ctx context.Context, name string, value float64) DegradationLevel {
    m.mu.Lock()
    defer m.mu.Unlock()
    
    rule, exists := m.rules[name]
    if !exists {
        return NoDegradation
    }
    
    state, exists := m.states[name]
    if !exists {
        state = &DegradationState{
            Level:   NoDegradation,
            Metrics: make(map[string]float64),
        }
        m.states[name] = state
    }
    
    // 更新指标
    state.Metrics["value"] = value
    
    // 如果当前处于降级状态,检查是否可以恢复
    if state.Level != NoDegradation {
        if time.Now().After(state.EndTime) {
            state.Level = NoDegradation
            state.StartTime = time.Time{}
            state.EndTime = time.Time{}
        } else {
            return state.Level
        }
    }
    
    // 检查是否需要降级
    if value > rule.Threshold {
        state.Level = rule.Level
        state.StartTime = time.Now()
        state.EndTime = state.StartTime.Add(rule.RecoveryTime)
        return rule.Level
    }
    
    return NoDegradation
}

// 降级处理器
type DegradationHandler struct {
    normal      func(context.Context) (interface{}, error)
    degraded    func(context.Context) (interface{}, error)
    fallback    func(context.Context) (interface{}, error)
}

func NewDegradationHandler(
    normal func(context.Context) (interface{}, error),
    degraded func(context.Context) (interface{}, error),
    fallback func(context.Context) (interface{}, error),
) *DegradationHandler {
    return &DegradationHandler{
        normal:   normal,
        degraded: degraded,
        fallback: fallback,
    }
}

func (h *DegradationHandler) Handle(ctx context.Context, level DegradationLevel) (interface{}, error) {
    switch level {
    case NoDegradation:
        return h.normal(ctx)
    case PartialDegradation:
        if h.degraded != nil {
            return h.degraded(ctx)
        }
        fallthrough
    case FullDegradation:
        if h.fallback != nil {
            return h.fallback(ctx)
        }
        return nil, ErrServiceDegraded
    default:
        return h.normal(ctx)
    }
}

3. 熔断器实现

// circuitbreaker/breaker.go
package circuitbreaker

import (
    "context"
    "sync"
    "time"
)

type State int

const (
    StateClosed State = iota    // 关闭状态(正常运行)
    StateOpen                   // 打开状态(熔断)
    StateHalfOpen              // 半开状态(尝试恢复)
)

type Settings struct {
    Name          string
    MaxRequests   uint32        // 熔断前的最大请求数
    Interval      time.Duration // 统计时间窗口
    Timeout       time.Duration // 熔断恢复时间
    Threshold     float64       // 错误率阈值
}

type CircuitBreaker struct {
    name          string
    state         State
    settings      Settings
    counts        Counts
    lastStateTime time.Time
    mu            sync.RWMutex
}

type Counts struct {
    Requests       uint32
    TotalFailures  uint32
    ConsecutiveFailures uint32
    LastFailureTime time.Time
}

func NewCircuitBreaker(settings Settings) *CircuitBreaker {
    return &CircuitBreaker{
        name:     settings.Name,
        state:    StateClosed,
        settings: settings,
        lastStateTime: time.Now(),
    }
}

func (cb *CircuitBreaker) Execute(ctx context.Context, run func() (interface{}, error)) (interface{}, error) {
    state := cb.GetState()
    
    switch state {
    case StateOpen:
        if !cb.shouldAttemptReset() {
            return nil, ErrCircuitBreakerOpen
        }
        cb.setState(StateHalfOpen)
        return cb.executeAndUpdateState(ctx, run)
        
    case StateHalfOpen:
        return cb.executeAndUpdateState(ctx, run)
        
    default: // StateClosed
        return cb.executeAndUpdateState(ctx, run)
    }
}

func (cb *CircuitBreaker) executeAndUpdateState(ctx context.Context, run func() (interface{}, error)) (interface{}, error) {
    defer func() {
        if r := recover(); r != nil {
            cb.recordFailure()
        }
    }()
    
    result, err := run()
    
    if err != nil {
        cb.recordFailure()
        return nil, err
    }
    
    cb.recordSuccess()
    return result, nil
}

func (cb *CircuitBreaker) recordSuccess() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    cb.counts.Requests++
    cb.counts.ConsecutiveFailures = 0
    
    if cb.state == StateHalfOpen {
        cb.setState(StateClosed)
    }
}

func (cb *CircuitBreaker) recordFailure() {
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    cb.counts.Requests++
    cb.counts.TotalFailures++
    cb.counts.ConsecutiveFailures++
    cb.counts.LastFailureTime = time.Now()
    
    if cb.shouldTrip() {
        cb.setState(StateOpen)
    }
}

func (cb *CircuitBreaker) shouldTrip() bool {
    if cb.counts.ConsecutiveFailures >= cb.settings.MaxRequests {
        return true
    }
    
    if cb.counts.Requests >= cb.settings.MaxRequests {
        failureRate := float64(cb.counts.TotalFailures) / float64(cb.counts.Requests)
        if failureRate >= cb.settings.Threshold {
            return true
        }
    }
    
    return false
}

func (cb *CircuitBreaker) shouldAttemptReset() bool {
    return time.Since(cb.lastStateTime) >= cb.settings.Timeout
}

func (cb *CircuitBreaker) GetState() State {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    
    now := time.Now()
    
    // 清理过期的计数
    if now.Sub(cb.lastStateTime) >= cb.settings.Interval {
        cb.mu.RUnlock()
        cb.mu.Lock()
        cb.counts = Counts{}
        cb.lastStateTime = now
        state := cb.state
        cb.mu.Unlock()
        cb.mu.RLock()
        return state
    }
    
    return cb.state
}

func (cb *CircuitBreaker) setState(state State) {
    cb.state = state
    cb.lastStateTime = time.Now()
    
    if state == StateClosed {
        cb.counts = Counts{}
    }
}

// 监控指标
type Metrics struct {
    State              State
    Requests           uint32
    TotalFailures      uint32
    ConsecutiveFailures uint32
    FailureRate        float64
    LastFailureTime    time.Time
}

func (cb *CircuitBreaker) Metrics() Metrics {
    cb.mu.RLock()
    defer cb.mu.RUnlock()
    
    var failureRate float64
    if cb.counts.Requests > 0 {
        failureRate = float64(cb.counts.TotalFailures) / float64(cb.counts.Requests)
    }
    
    return Metrics{
        State:               cb.state,
        Requests:            cb.counts.Requests,
        TotalFailures:       cb.counts.TotalFailures,
        ConsecutiveFailures: cb.counts.ConsecutiveFailures,
        FailureRate:         failureRate,
        LastFailureTime:     cb.counts.LastFailureTime,
    }
}

// 熔断器管理器
type BreakerManager struct {
    breakers map[string]*CircuitBreaker
    mu       sync.RWMutex
}

func NewBreakerManager() *BreakerManager {
    return &BreakerManager{
        breakers: make(map[string]*CircuitBreaker),
    }
}

func (m *BreakerManager) GetBreaker(name string) (*CircuitBreaker, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    breaker, exists := m.breakers[name]
    return breaker, exists
}

func (m *BreakerManager) AddBreaker(settings Settings) *CircuitBreaker {
    m.mu.Lock()
    defer m.mu.Unlock()
    
    breaker := NewCircuitBreaker(settings)
    m.breakers[settings.Name] = breaker
    return breaker
}

// 自定义熔断策略
type TripStrategy interface {
    ShouldTrip(counts Counts) bool
}

// 连续失败策略
type ConsecutiveFailuresStrategy struct {
    Threshold uint32
}

func (s *ConsecutiveFailuresStrategy) ShouldTrip(counts Counts) bool {
    return counts.ConsecutiveFailures >= s.Threshold
}

// 错误率策略
type ErrorRateStrategy struct {
    Threshold    float64
    MinRequests  uint32
}

func (s *ErrorRateStrategy) ShouldTrip(counts Counts) bool {
    if counts.Requests < s.MinRequests {
        return false
    }
    
    failureRate := float64(counts.TotalFailures) / float64(counts.Requests)
    return failureRate >= s.Threshold
}

4. 限流器实现

// ratelimit/limiter.go
package ratelimit

import (
    "context"
    "sync"
    "time"
)

// 令牌桶限流器
type TokenBucket struct {
    rate       float64     // 令牌产生速率
    capacity   float64     // 桶容量
    tokens     float64     // 当前令牌数
    lastUpdate time.Time   // 上次更新时间
    mu         sync.Mutex
}

func NewTokenBucket(rate float64, capacity float64) *TokenBucket {
    return &TokenBucket{
        rate:       rate,
        capacity:   capacity,
        tokens:     capacity,
        lastUpdate: time.Now(),
    }
}

func (tb *TokenBucket) Allow() bool {
    return tb.AllowN(1)
}

func (tb *TokenBucket) AllowN(n float64) bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    now := time.Now()
    
    // 计算从上次更新到现在产生的令牌数
    elapsed := now.Sub(tb.lastUpdate).Seconds()
    tb.tokens = min(tb.capacity, tb.tokens+elapsed*tb.rate)
    tb.lastUpdate = now
    
    if tb.tokens < n {
        return false
    }
    
    tb.tokens -= n
    return true
}

// 滑动窗口限流器
type SlidingWindow struct {
    capacity   int                // 窗口容量
    timeWindow time.Duration      // 时间窗口大小
    windows    map[int64]int      // 各个小窗口的请求数
    mu         sync.Mutex
}

func NewSlidingWindow(capacity int, timeWindow time.Duration) *SlidingWindow {
    return &SlidingWindow{
        capacity:   capacity,
        timeWindow: timeWindow,
        windows:    make(map[int64]int),
    }
}

func (sw *SlidingWindow) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()
    
    now := time.Now().UnixNano()
    windowStart := now - sw.timeWindow.Nanoseconds()
    
    // 清理过期的窗口
    for timestamp := range sw.windows {
        if timestamp < windowStart {
            delete(sw.windows, timestamp)
        }
    }
    
    // 计算当前请求数
    var total int
    for _, count := range sw.windows {
        total += count
    }
    
    if total >= sw.capacity {
        return false
    }
    
    // 记录新请求
    currentWindow := now / int64(time.Second)
    sw.windows[currentWindow]++
    
    return true
}

// 漏桶限流器
type LeakyBucket struct {
    rate      float64     // 漏出速率
    capacity  float64     // 桶容量
    water     float64     // 当前水量
    lastLeakTime time.Time  // 上次漏水时间
    mu        sync.Mutex
}

func NewLeakyBucket(rate float64, capacity float64) *LeakyBucket {
    return &LeakyBucket{
        rate:     rate,
        capacity: capacity,
        lastLeakTime: time.Now(),
    }
}

func (lb *LeakyBucket) Allow() bool {
    lb.mu.Lock()
    defer lb.mu.Unlock()
    
    now := time.Now()
    
    // 计算从上次漏水到现在漏掉的水量
    elapsed := now.Sub(lb.lastLeakTime).Seconds()
    lb.water = max(0, lb.water-elapsed*lb.rate)
    lb.lastLeakTime = now
    
    if lb.water >= lb.capacity {
        return false
    }
    
    lb.water++
    return true
}

// 分布式限流器(基于Redis)
type DistributedRateLimiter struct {
    redis      RedisClient
    key        string
    rate       int
    window     time.Duration
}

type RedisClient interface {
    Eval(script string, keys []string, args ...interface{}) (interface{}, error)
}

func NewDistributedRateLimiter(redis RedisClient, key string, rate int, window time.Duration) *DistributedRateLimiter {
    return &DistributedRateLimiter{
        redis:  redis,
        key:    key,
        rate:   rate,
        window: window,
    }
}

const limitScript = `
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = tonumber(redis.call('get', key) or "0")
if current >= limit then
    return 0
end
redis.call('incr', key)
redis.call('expire', key, window)
return 1
`

func (rl *DistributedRateLimiter) Allow(ctx context.Context) bool {
    result, err := rl.redis.Eval(
        limitScript,
        []string{rl.key},
        rl.rate,
        int(rl.window.Seconds()),
    )
    if err != nil {
        return false
    }
    
    return result.(int64) == 1
}

func min(a, b float64) float64 {
    if a < b {
        return a
    }
    return b
}

func max(a, b float64) float64 {
    if a > b {
        return a
    }
    return b
}

5. 系统流程图

让我们通过一个流程图来展示整个系统的工作流程:
在这里插入图片描述

6. 使用示例

让我们看一个完整的使用示例:

// main.go
package main

import (
    "context"
    "log"
    "net/http"
    "time"
)

func main() {
    // 初始化限流器
    rateLimiter := ratelimit.NewTokenBucket(100, 1000) // 每秒100个请求,最多积攒1000个令牌
    
    // 初始化熔断器
    breaker := circuitbreaker.NewCircuitBreaker(circuitbreaker.Settings{
        Name:        "example-service",
        MaxRequests: 100,
        Interval:    time.Minute,
        Timeout:     time.Minute * 5,
        Threshold:   0.5, // 50%错误率触发熔断
    })
    
    // 初始化负载均衡器
    balancer := loadbalancer.NewWeightedRoundRobinBalancer()
    balancer.UpdateInstances([]*loadbalancer.Instance{
        {ID: "server1", Host: "localhost", Port: 8081, Weight: 2},
        {ID: "server2", Host: "localhost", Port: 8082, Weight: 1},
        {ID: "server3", Host: "localhost", Port: 8083, Weight: 1},
    })
    
    // 初始化服务降级管理器
    degradation := degradation.NewDegradationManager()
    degradation.AddRule(&degradation.DegradationRule{
        Name:         "high-load",
        Threshold:    0.8, // CPU使用率超过80%触发降级
        TimeWindow:   time.Minute,
        Level:        degradation.PartialDegradation,
        RecoveryTime: time.Minute * 5,
    })
    
    // HTTP处理器
    http.HandleFunc("/api/example", func(w http.ResponseWriter, r *http.Request) {
        // 限流检查
        if !rateLimiter.Allow() {
            http.Error(w, "Rate limit exceeded", http.StatusTooManyRequests)
            return
        }
        
        // 获取降级状态
        degradationLevel := degradation.CheckDegradation(r.Context(), "high-load", getCPUUsage())
        
        // 处理降级情况
        handler := degradation.NewDegradationHandler(
            // 正常处理
            func(ctx context.Context) (interface{}, error) {
                return breaker.Execute(ctx, func() (interface{}, error) {
                    // 选择服务实例
                    instance, err := balancer.Select()
                    if err != nil {
                        return nil, err
                    }
                    
                    // 调用服务
                    resp, err := callService(instance)
                    if err != nil {
                        // 标记失败
                        balancer.MarkFailed(instance)
                        return nil, err
                    }
                    
                    // 标记成功
                    balancer.MarkSuccess(instance)
                    return resp, nil
                })
            },
            // 部分降级处理
            func(ctx context.Context) (interface{}, error) {
                // 返回缓存数据
                return getFromCache(ctx)
            },
            // 完全降级处理
            func(ctx context.Context) (interface{}, error) {
                // 返回降级默认值
                return getDefaultResponse(ctx)
            },
        )
        
        // 执行请求处理
        result, err := handler.Handle(r.Context(), degradationLevel)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
            return
        }
        
        // 返回结果
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(result)
    })
    
    // 监控处理器
    http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        metrics := map[string]interface{}{
            "circuit_breaker": breaker.Metrics(),
            "rate_limiter": map[string]interface{}{
                "qps": rateLimiter.QPS(),
                "total_requests": rateLimiter.TotalRequests(),
            },
            "load_balancer": balancer.Metrics(),
        }
        
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(metrics)
    })
    
    // 启动服务器
    log.Fatal(http.ListenAndServe(":8080", nil))
}

// 辅助函数
func callService(instance *loadbalancer.Instance) (interface{}, error) {
    url := fmt.Sprintf("http://%s:%d/api", instance.Host, instance.Port)
    
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
    defer cancel()
    
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return nil, err
    }
    
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("service returned status: %d", resp.StatusCode)
    }
    
    var result interface{}
    if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
        return nil, err
    }
    
    return result, nil
}

func getFromCache(ctx context.Context) (interface{}, error) {
    // 实现缓存读取逻辑
    cache := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    defer cache.Close()
    
    value, err := cache.Get(ctx, "cache_key").Result()
    if err != nil {
        return nil, err
    }
    
    var result interface{}
    if err := json.Unmarshal([]byte(value), &result); err != nil {
        return nil, err
    }
    
    return result, nil
}

func getDefaultResponse(ctx context.Context) (interface{}, error) {
    // 返回降级默认响应
    return map[string]interface{}{
        "status": "degraded",
        "data": map[string]interface{}{
            "message": "Service is temporarily degraded",
            "fallback_data": []string{"default", "response", "data"},
        },
    }, nil
}

func getCPUUsage() float64 {
    var status syscall.Statfs_t
    if err := syscall.Statfs("/", &status); err != nil {
        return 0
    }
    
    // 获取CPU使用率
    percent, err := cpu.Percent(time.Second, false)
    if err != nil {
        return 0
    }
    
    if len(percent) > 0 {
        return percent[0]
    }
    
    return 0
}

// 配置项
type Config struct {
    RateLimit struct {
        QPS      float64       `yaml:"qps"`
        Capacity float64       `yaml:"capacity"`
    } `yaml:"rate_limit"`
    
    CircuitBreaker struct {
        MaxRequests uint32        `yaml:"max_requests"`
        Interval    time.Duration `yaml:"interval"`
        Timeout     time.Duration `yaml:"timeout"`
        Threshold   float64       `yaml:"threshold"`
    } `yaml:"circuit_breaker"`
    
    LoadBalancer struct {
        Instances []struct {
            ID     string `yaml:"id"`
            Host   string `yaml:"host"`
            Port   int    `yaml:"port"`
            Weight int    `yaml:"weight"`
        } `yaml:"instances"`
    } `yaml:"load_balancer"`
    
    Degradation struct {
        Rules []struct {
            Name         string        `yaml:"name"`
            Threshold    float64       `yaml:"threshold"`
            TimeWindow   time.Duration `yaml:"time_window"`
            Level        string        `yaml:"level"`
            RecoveryTime time.Duration `yaml:"recovery_time"`
        } `yaml:"rules"`
    } `yaml:"degradation"`
}

func loadConfig(filename string) (*Config, error) {
    data, err := ioutil.ReadFile(filename)
    if err != nil {
        return nil, err
    }
    
    var config Config
    if err := yaml.Unmarshal(data, &config); err != nil {
        return nil, err
    }
    
    return &config, nil
}

让我们继续完成使用示例的实现:

7. 配置示例

让我们看一个配置文件的示例:

# config.yaml
rate_limit:
  qps: 100
  capacity: 1000

circuit_breaker:
  max_requests: 100
  interval: 60s
  timeout: 300s
  threshold: 0.5

load_balancer:
  instances:
    - id: server1
      host: localhost
      port: 8081
      weight: 2
    - id: server2
      host: localhost
      port: 8082
      weight: 1
    - id: server3
      host: localhost
      port: 8083
      weight: 1

degradation:
  rules:
    - name: high-load
      threshold: 0.8
      time_window: 60s
      level: partial
      recovery_time: 300s
    - name: error-rate
      threshold: 0.3
      time_window: 60s
      level: full
      recovery_time: 300s

8. 关键功能说明

  1. 负载均衡:

    • 轮询策略
    • 加权轮询
    • 最小连接数
    • 实例健康检查
    • 动态更新实例列表
  2. 服务降级:

    • 多级降级策略
    • 基于指标的降级
    • 自动恢复机制
    • 降级处理器
  3. 熔断机制:

    • 状态管理
    • 失败计数
    • 自动恢复
    • 半开状态试探
  4. 限流设计:

    • 令牌桶算法
    • 滑动窗口
    • 漏桶算法
    • 分布式限流

这个完整的服务治理系统提供了:

  1. 全面的服务保护机制
  2. 灵活的配置选项
  3. 可扩展的设计
  4. 完整的监控指标
  5. 多种降级策略
  6. 分布式支持

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!


原文地址:https://blog.csdn.net/weixin_40780178/article/details/144337715

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!