自学内容网 自学内容网

⭐Redis - 手动实现分布式锁 & Redisson 的使用

概述

  1. 定义:分布式系统或集群模式下,多进程或多节点之间 “可见” 并且 “互斥” 的锁机制

  2. 功能:确保同一时刻只有一个进程或节点能够获取某项资源的访问权

  3. 特点

    1. 互斥
    2. 高可用
    3. 多进程可见
    4. 高并发 (高性能)
    5. 安全性 (避免死锁问题)
  4. 常见的分布式锁

    MySQLRedisZookeeper
    互斥MySQL 本身的互斥锁机制利用 setnx 互斥命令利用节点的唯一性和有序性实现互斥
    高可用
    高性能一般一般 (强调一致性)
    安全性断开连接,自动释放锁利用锁超时时间,到期释放临时节点,断开连接自动释放

Redis 分布式锁

一、核心思路

  1. 运行逻辑
    1. 线程进入时,利用 redis 的setNx 方法添加 key 作为逻辑锁
    2. 返回结果 == 1 → 当前线程抢到了锁,继续执行业务逻辑后删除 key,逻辑上释放锁
    3. 返回结果 == 0 → 等待一定时间后重新尝试获取锁
  2. 实现途径:setNx 方法(添加代表锁的 key,如果没有当前 key 则添加当前 key-value 并返回 1,已有当前 key 则返回 0)
  3. 最佳实践
    1. 在 Util 包下创建一个分布式锁的类,专门供业务模块(xxxService)获取锁和释放锁
    2. 在 ServiceImpl 的代码中调用 Util 中的锁工具并手动 ”上锁 / 释放锁”

二、获取锁

  1. 功能:如果 Redis 中不存在当前锁,则在 Redis 中写入锁名 / 线程名 / 超时释放时间
  2. Redis命令:SET myLockName myThreadName NX EX myExpireTime
  3. 核心代码:stringRedisTemplate.opsForValue().setIfAbsent( KEY_PREFIX + keyName, threadId, time, TimeUnit.SECONDS )

三、释放锁

  1. 功能:逻辑上释放分布式锁,让该服务不再占用资源
  2. 实现流程
    1. 判断 Redis 中存在对此业务的锁
      1. 存在锁 → 判断这个锁是否是当前线程获取
        1. 锁属于当前线程 → 释放锁(删除 redis 中的 key-value)
        2. 锁不属于当前线程 → 拒绝释放锁
      2. 不存在锁 → 释放锁失败
  3. Redis命令:DEL myLockName

四、常见问题

误删问题

  1. 问题引入
    1. 持有锁的线程 1 出现阻塞,导致锁超时自动释放
    2. 线程2 请求获取锁,成功获得锁并开始执行业务逻辑
    3. 线程 1 唤醒,继续执行业务逻辑并删除锁 (误删其他线程的锁)
  2. 核心思路 :key 作为锁,value 用来标识是属于哪个进程的锁
  3. 解决方案
    1. 存入锁时,value 中放入当前线程的标识(声明锁的主人)
    2. 删除锁时,判断当前锁的 value 是否包含当前线程的标识(检查是否由主人自己解锁)
      1. 锁属于当前线程 → 删除
      2. 锁不属于当前线程 → 拒绝删除

原子性问题

  1. 问题引入

    1. 线程 1 持有锁并执行业务逻辑后,已经判断锁属于自己
    2. 线程1准备删除锁,但是此时锁过期
    3. 线程2请求获取锁,获取锁成功
    4. 线程1继续执行删除锁逻辑 (误删其他线程的锁)
  2. 核心思路 :通过 Lua 脚本保证 “判断锁” 与 “删除锁” 的原子性

  3. Redis 的调用函数:redis.call('命令名称', 'key', '其它参数', ...)

  4. 代码实现

    1. 目标:保证 unlock 操作的判断锁和删除锁是原子性操作 ( Lua 脚本 )
    -- 定义 Lua 脚本,用于安全释放分布式锁
    local key = KEYS[1]                -- 锁的键名
    local threadId = ARGV[1]           -- 当前线程标识
    
    -- 判断锁是否属于当前线程
    if (redis.call('GET', key) == threadId) then
        return redis.call('DEL', key)  -- 锁属于当前线程 -> 删除锁
    end
    return 0                           -- 锁不属于当前线程 -> 返回失败
    

可重入问题

  1. 目标:对于当前获取了锁的进程,可以再次获取自己的锁

  2. 原理

    1. 借助底层的一个 voaltile 的一个 state 变量来记录重入的状态

    2. 没有进程持有当前锁(state=0) ⇒ 线程请求当前锁(state=1)

    3. 持有这把锁的线程再次请求当前锁 ⇒ state++

    4. 持有这把锁的线程释放当前锁 ⇒ state--

    5. state ==0 ⇒ 释放当前锁

  3. 参数

    1. KEYS[1]:锁名称
    2. ARGV[1]:锁失效时间
    3. ARGV[2]:id + “:” + threadId ,表示持有锁的进程 (field)

过期问题

  1. 问题引入:如果任务执行时间超过锁的过期时间,锁可能会被误删

  2. 解决方案:看门狗机制,即在锁的过期时间内,通过后台线程周期性地延长锁的有效期

  3. 代码实现

    // 全局调度器,负责分配任务
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
    // 使用 ThreadLocal 保存当前线程的续约任务
    private final ThreadLocal<ScheduledFuture<?>> renewalTask = new ThreadLocal<>();
    
    public void lockWithWatchdog() {
        lock();
        // 启动定时任务
        ScheduledFuture<?> task = scheduler.scheduleAtFixedRate(() -> {
            stringRedisTemplate.expire(KEY_PREFIX + name, RELEASE_TIME_IN_SECONDS, TimeUnit.SECONDS);
        },                           // 续约操作
        RELEASE_TIME_IN_SECONDS / 2, // initialDelay: 延迟多长时间后首次执行任务
        RELEASE_TIME_IN_SECONDS / 2, // period: 两次任务之间的时间间隔
        TimeUnit.SECONDS);           // 时间单位:这里为秒
        
        renewalTask.set(task);       // 将当前线程的续约任务保存到 ThreadLocal 中
    }
    
    public void unlockWithWatchdog() {
    // 解锁
        unlock();
        
        // 从 ThreadLocal 中获取当前线程的续约任务
        ScheduledFuture<?> task = renewalTask.get();
        if (task != null) {
            task.cancel(false);   // 取消当前续约任务
            renewalTask.remove(); // 清理 ThreadLocal,防止内存泄漏
        }
    }
    

解决方案

  1. "获取锁" 脚本(lock.lua)

    local key = KEYS[1];                       -- key:锁的id
    local threadId = ARGV[1];                  -- threadId:线程的id
    local releaseTime = tonumber(ARGV[2]);     -- releaseTime:锁的自动释放时间(失效时间)
    
    -- 锁(key)不存在
    if( redis.call('exists', key) == 0 ) then
    redis.call('hset', key, threadId, 1);            -- 成功获取锁
    redis.call('pexpire', key, releaseTime);         -- 设置有效期
    return 1;                                        -- 返回成功
    end;
    
    -- 锁(key)已存在 && 锁(threadId)属于当前线程
    if( redis.call('hexists', key, threadId ) == 1 ) then
    redis.call( 'hincrby', key, threadId, 1 );       -- 当前线程的锁 +1
    redis.call( 'pexpire', key, releaseTime );       -- 更新有效期
    return 1;                                        -- 返回成功
    end;
    
    -- 锁存在 && 锁不属于当前线程,返回失败(当前锁已被占用)
    return 0;
    
  2. ”释放锁“ 脚本(unlock.lua)

    local key = KEYS[1];                       -- key:锁的id
    local threadId = ARGV[1];                  -- threadId:线程的id
    local releaseTime = tonumber(ARGV[2]);     -- releaseTime:锁的自动释放时间(失效时间)
    
    -- 当前锁不属于当前进程
    if( redis.call('HEXISTS', key, threadId ) == 0 ) then
    return 0;
    end;
    
    -- 当前锁属于当前进程, 重入次数 -1
    local count = redis.call('HINCRBY', key, threadId, -1 );
    
    -- 重入次数已经为0,释放锁
    if( count  == 0 ) then
    redis.call('DEL', key);
    return 1;
    end;
    
    -- 重入次数还不为0,更新有效期
    redis.call('PEXPIRE', key, releaseTime);
    return 0;
    
  3. Java 代码

    public class RedisLock {
    
        private final StringRedisTemplate stringRedisTemplate;
        private static final String KEY_PREFIX = "lock:";
        private final int RELEASE_TIME_IN_SECONDS;                 // 锁的过期时间
    
    // 创建两个Lua脚本,用静态代码块加载 lock 和 unlock 脚本
    private static final DefaultRedisScript<Long> LOCK_SCRIPT;
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
        static {
        LOCK_SCRIPT = new DefaultRedisScript<>();
        LOCK_SCRIPT.setLocation(new ClassPathResource("/path/to/lock.lua"));
        LOCK_SCRIPT.setResultType(Long.class);
            UNLOCK_SCRIPT = new DefaultRedisScript<>();
            UNLOCK_SCRIPT.setLocation(new ClassPathResource("/path/to/unlock.lua"));
            UNLOCK_SCRIPT.setResultType(Long.class);
        }
    
    
    // 重写锁操作,调用Lua脚本实现 “判断锁” 与 “删除锁” 的原子性操作,并实现可重入和避免误删
    public void lock(String key) {
        stringRedisTemplate.execute(                             // redis用execute函数调用lua脚本
                LOCK_SCRIPT,                                     // 声明调用的脚本对象
                Collections.singletonList(KEY_PREFIX + key),     // 传入key(锁)
                ID_PREFIX + Thread.currentThread().getId());     // 传入value(线程标识)
                String.valueOf(RELEASE_TIME_IN_MILLIS));         // 添加释放时间
    }
    
    public void unlock(String key) {
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + key),
                ID_PREFIX + Thread.currentThread().getId());
                String.valueOf(RELEASE_TIME_IN_MILLIS));
    }
    

Redisson 组件

一、概述

  1. 定义:Redis 基础上实现的分布式工具集合,一个 Java 驻内存数据网格(In-Memory Data Grid)
  2. 功能
    1. 常用对象:提供分布式的 Java 常用对象
    2. 分布式锁:提供分布式服务 (包括各种分布式锁的实现)
    3. 并发安全:解决集群模式下的并发安全问题
  3. 特点
    1. 可重入:线程可以二次获取自己加的锁,防止线程调用自己产生的死锁
    2. 可重试:线程请求锁失败后,应该能再次请求锁
    3. 超时续期:超时释放存在安全隐患,lua表达式只能保证不误删,但是超时释放后实际上有两个线程在锁的逻辑内
    4. 主从一致:主从同步之前,如果主机宕机就会出现死锁问题

二、使用入门

  1. 引入依赖

    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
    </dependency>
    
  2. 配置 Redission 客户端 (配置信息)

    @Configuration
    public class RedissonConfig {
    
        @Bean
        public RedissonClient redissonClient(){
            Config config = new Config();
            config.useSingleServer()          // 使用单节点模式连接 Redis
            .setAddress("redis://192.168.7.7:6379")
                .setPassword("123456");
            return Redisson.create(config);   // 创建RedissonClient对象
        }
    }
    
  3. 在 ServiceImpl 中注入 RedissonClient 依赖

    @AutoWired
    RedissonClient redissonClient;
    

四、锁重试 & WatchDog

  1. 锁重试机制
    1. 获取锁失败时可以设置重试等待时间
    2. 在等待时间内会周期性尝试获取锁
    3. 示例:lock.tryLock(100, 10, TimeUnit.SECONDS) - 等待100秒,每10秒重试一次
  2. WatchDog 机制
    1. 默认锁有效期为 30 秒
    2. 获取锁成功后,会启动一个后台线程,定期延长锁的有效期
    3. 释放锁时,WatchDog 线程自动停止

五、multiLock 原理

  1. 概念

    1. MultiLock 可以将多个 RLock 对象关联为一个整体
    2. 只有所有的锁都获取成功,才算获取成功
  2. 使用场景

    1. Redis 主从架构下保证锁的可靠性
    2. 多个资源需要同时加锁的场景
  3. 示例代码

    RLock lock1 = redissonClient1.getLock("lock1");
    RLock lock2 = redissonClient2.getLock("lock2");
    RLock multiLock = redissonClient.getMultiLock(lock1, lock2);
    try {
        // 同时获取多把锁
        multiLock.lock();
        // 业务逻辑
    } finally {
        // 释放所有锁
        multiLock.unlock();
    }
    


业务逻辑


创建资源


抢占资源 (Lua 脚本)

一、优惠券下单逻辑

二、代码实现 (Lua脚本)

--1. 参数列表
--1.1. 优惠券id
local voucherId = ARGV[1]
--1.2. 用户id
local userId = ARGV[2]
--1.3. 订单id
local orderId = ARGV[3]

--2. 数据key
--2.1. 库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2. 订单key
local orderKey = 'seckill:order' .. voucherId

--3. 脚本业务
--3.1. 判断库存是否充足 get stockKey
if( tonumber( redis.call('get', stockKey) ) <= 0 ) then
return 1
end
--3.2. 判断用户是否下单 SISMEMBER orderKey userId
if( redis.call( 'sismember', orderKey, userId ) == 1 ) then
return 2
end
--3.4 扣库存: stockKey 的库存 -1
redis.call( 'incrby', stockKey, -1 )
--3.5 下单(保存用户): orderKey 集合中添加 userId
redis.call( 'sadd', orderKey, userId )
-- 3.6. 发送消息到队列中
redis.call( 'xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )

三、加载 Lua 脚本

  1. RedisScript 接口:用于绑定一个具体的 Lua 脚本
  2. DefaultRedisScript 实现类
    1. 定义:RedisScript 接口的实现类

    2. 功能:提前加载 Lua 脚本

    3. 示例

      // 创建Lua脚本对象
      private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
      
      // Lua脚本初始化 (通过静态代码块)
      static {
      SECKILL_SCRIPT = new DefaultRedisScript<>();
      SECKILL_SCRIPT.setLocation(new ClassPathResource("/path/to/lua_script.lua"));
      SECKILL_SCRIPT.setResultType(Long.class);
      }
      

四、执行 Lua 脚本

  1. 调用Lua脚本 API :StringRedisTemplate.execute( RedisScript<T> script, List<K> keys, Object… args )
  2. 示例
    1. 执行 ”下单脚本” (此时不需要 key,因为下单时只需要用 userId 和 voucherId 查询是否有锁)

      Long result = stringRedisTemplate.execute(
              SECKILL_SCRIPT,                                                        // 要执行的脚本
              Collections.emptyList(),                                               // KEY
              voucherId.toString(), userId.toString(), String.valueOf(orderId)       // VALUES
      );
      
    2. 执行 “unlock脚本”


用户下单

  1. 代码逻辑

  2. VoucherOrderServiceImpl 代码

    @Override
    public Result seckillVoucher( Long voucherId ) {
    // 1. 获取用户id/订单id
    Long userId = UserHolder.getUser().getId();
    long orderId = redisIdWorker.nextId("order");
    // 2. 执行Lua脚本
    Long luaResult = stringRedisTemplate.execute(
    SECKILL_SCRIPT,
    Collections.emptyList(),
    voucherId.toString(), userId.toString(), String.valueOf(orderId)
    );
    // 3. 反向判断lua脚本是否成功下单 (下单失败提前退出)
    if( luaResult != 0L ) {
    return Result.fail( r == 1 ? "库存不足":"同一用户不能重复下单");
    }
    // 4. 下单成功后的操作
    // 4.1. 获取订单id
    long orderId = redisIdWorker.nextId("order");
    // 4.2. 保存阻塞队列
    
    return Result.ok(orderId);
    }
    


原文地址:https://blog.csdn.net/Shad0wLEO/article/details/144442164

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!