⭐Redis - 手动实现分布式锁 & Redisson 的使用
概述
-
定义
:分布式系统或集群模式下,多进程或多节点之间 “可见” 并且 “互斥” 的锁机制 -
功能
:确保同一时刻只有一个进程或节点能够获取某项资源的访问权 -
特点
- 互斥
- 高可用
- 多进程可见
- 高并发 (高性能)
- 安全性 (避免死锁问题)
-
常见的分布式锁
MySQL Redis Zookeeper 互斥 MySQL 本身的互斥锁机制 利用 setnx 互斥命令 利用节点的唯一性和有序性实现互斥 高可用 好 好 好 高性能 一般 好 一般 (强调一致性) 安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放
Redis 分布式锁
一、核心思路
运行逻辑
- 线程进入时,利用 redis 的setNx 方法添加 key 作为逻辑锁
- 返回结果 == 1 → 当前线程抢到了锁,继续执行业务逻辑后删除 key,逻辑上释放锁
- 返回结果 == 0 → 等待一定时间后重新尝试获取锁
实现途径
:setNx 方法(添加代表锁的 key,如果没有当前 key 则添加当前 key-value 并返回 1,已有当前 key 则返回 0)最佳实践
- 在 Util 包下创建一个分布式锁的类,专门供业务模块(xxxService)获取锁和释放锁
- 在 ServiceImpl 的代码中调用 Util 中的锁工具并手动 ”上锁 / 释放锁”
二、获取锁
功能
:如果 Redis 中不存在当前锁,则在 Redis 中写入锁名 / 线程名 / 超时释放时间- ⭐
Redis命令
:SET myLockName myThreadName NX EX myExpireTime - ⭐
核心代码
:stringRedisTemplate.opsForValue().setIfAbsent( KEY_PREFIX + keyName, threadId, time, TimeUnit.SECONDS )
三、释放锁
功能
:逻辑上释放分布式锁,让该服务不再占用资源实现流程
- 判断 Redis 中存在对此业务的锁
- 存在锁 → 判断这个锁是否是当前线程获取
- 锁属于当前线程 → 释放锁(删除 redis 中的 key-value)
- 锁不属于当前线程 → 拒绝释放锁
- 不存在锁 → 释放锁失败
- 存在锁 → 判断这个锁是否是当前线程获取
- 判断 Redis 中存在对此业务的锁
Redis命令
:DEL myLockName
四、常见问题
误删问题
问题引入
- 持有锁的线程 1 出现阻塞,导致锁超时自动释放
- 线程2 请求获取锁,成功获得锁并开始执行业务逻辑
- 线程 1 唤醒,继续执行业务逻辑并删除锁 (误删其他线程的锁)
核心思路
:key 作为锁,value 用来标识是属于哪个进程的锁解决方案
- 存入锁时,value 中放入当前线程的标识(声明锁的主人)
- 删除锁时,判断当前锁的 value 是否包含当前线程的标识(检查是否由主人自己解锁)
- 锁属于当前线程 → 删除
- 锁不属于当前线程 → 拒绝删除
原子性问题
-
问题引入
- 线程 1 持有锁并执行业务逻辑后,已经判断锁属于自己
- 线程1准备删除锁,但是此时锁过期
- 线程2请求获取锁,获取锁成功
- 线程1继续执行删除锁逻辑 (误删其他线程的锁)
-
核心思路
:通过 Lua 脚本保证 “判断锁” 与 “删除锁” 的原子性 -
Redis 的调用函数
:redis.call('命令名称', 'key', '其它参数', ...) -
代码实现
- 目标:保证 unlock 操作的判断锁和删除锁是原子性操作 ( Lua 脚本 )
-- 定义 Lua 脚本,用于安全释放分布式锁 local key = KEYS[1] -- 锁的键名 local threadId = ARGV[1] -- 当前线程标识 -- 判断锁是否属于当前线程 if (redis.call('GET', key) == threadId) then return redis.call('DEL', key) -- 锁属于当前线程 -> 删除锁 end return 0 -- 锁不属于当前线程 -> 返回失败
可重入问题
-
目标
:对于当前获取了锁的进程,可以再次获取自己的锁 -
原理
-
借助底层的一个 voaltile 的一个 state 变量来记录重入的状态
-
没有进程持有当前锁(state=0) ⇒ 线程请求当前锁(state=1)
-
持有这把锁的线程再次请求当前锁 ⇒ state++
-
持有这把锁的线程释放当前锁 ⇒ state--
-
state ==0 ⇒ 释放当前锁
-
-
参数
- KEYS[1]:锁名称
- ARGV[1]:锁失效时间
- ARGV[2]:id + “:” + threadId ,表示持有锁的进程 (field)
过期问题
-
问题引入:如果任务执行时间超过锁的过期时间,锁可能会被误删
-
解决方案:看门狗机制,即在锁的过期时间内,通过后台线程周期性地延长锁的有效期
-
代码实现
// 全局调度器,负责分配任务 private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10); // 使用 ThreadLocal 保存当前线程的续约任务 private final ThreadLocal<ScheduledFuture<?>> renewalTask = new ThreadLocal<>(); public void lockWithWatchdog() { lock(); // 启动定时任务 ScheduledFuture<?> task = scheduler.scheduleAtFixedRate(() -> { stringRedisTemplate.expire(KEY_PREFIX + name, RELEASE_TIME_IN_SECONDS, TimeUnit.SECONDS); }, // 续约操作 RELEASE_TIME_IN_SECONDS / 2, // initialDelay: 延迟多长时间后首次执行任务 RELEASE_TIME_IN_SECONDS / 2, // period: 两次任务之间的时间间隔 TimeUnit.SECONDS); // 时间单位:这里为秒 renewalTask.set(task); // 将当前线程的续约任务保存到 ThreadLocal 中 } public void unlockWithWatchdog() { // 解锁 unlock(); // 从 ThreadLocal 中获取当前线程的续约任务 ScheduledFuture<?> task = renewalTask.get(); if (task != null) { task.cancel(false); // 取消当前续约任务 renewalTask.remove(); // 清理 ThreadLocal,防止内存泄漏 } }
解决方案
-
"获取锁" 脚本(lock.lua)
local key = KEYS[1]; -- key:锁的id local threadId = ARGV[1]; -- threadId:线程的id local releaseTime = tonumber(ARGV[2]); -- releaseTime:锁的自动释放时间(失效时间) -- 锁(key)不存在 if( redis.call('exists', key) == 0 ) then redis.call('hset', key, threadId, 1); -- 成功获取锁 redis.call('pexpire', key, releaseTime); -- 设置有效期 return 1; -- 返回成功 end; -- 锁(key)已存在 && 锁(threadId)属于当前线程 if( redis.call('hexists', key, threadId ) == 1 ) then redis.call( 'hincrby', key, threadId, 1 ); -- 当前线程的锁 +1 redis.call( 'pexpire', key, releaseTime ); -- 更新有效期 return 1; -- 返回成功 end; -- 锁存在 && 锁不属于当前线程,返回失败(当前锁已被占用) return 0;
-
”释放锁“ 脚本(unlock.lua)
local key = KEYS[1]; -- key:锁的id local threadId = ARGV[1]; -- threadId:线程的id local releaseTime = tonumber(ARGV[2]); -- releaseTime:锁的自动释放时间(失效时间) -- 当前锁不属于当前进程 if( redis.call('HEXISTS', key, threadId ) == 0 ) then return 0; end; -- 当前锁属于当前进程, 重入次数 -1 local count = redis.call('HINCRBY', key, threadId, -1 ); -- 重入次数已经为0,释放锁 if( count == 0 ) then redis.call('DEL', key); return 1; end; -- 重入次数还不为0,更新有效期 redis.call('PEXPIRE', key, releaseTime); return 0;
-
Java 代码
public class RedisLock { private final StringRedisTemplate stringRedisTemplate; private static final String KEY_PREFIX = "lock:"; private final int RELEASE_TIME_IN_SECONDS; // 锁的过期时间 // 创建两个Lua脚本,用静态代码块加载 lock 和 unlock 脚本 private static final DefaultRedisScript<Long> LOCK_SCRIPT; private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { LOCK_SCRIPT = new DefaultRedisScript<>(); LOCK_SCRIPT.setLocation(new ClassPathResource("/path/to/lock.lua")); LOCK_SCRIPT.setResultType(Long.class); UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("/path/to/unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } // 重写锁操作,调用Lua脚本实现 “判断锁” 与 “删除锁” 的原子性操作,并实现可重入和避免误删 public void lock(String key) { stringRedisTemplate.execute( // redis用execute函数调用lua脚本 LOCK_SCRIPT, // 声明调用的脚本对象 Collections.singletonList(KEY_PREFIX + key), // 传入key(锁) ID_PREFIX + Thread.currentThread().getId()); // 传入value(线程标识) String.valueOf(RELEASE_TIME_IN_MILLIS)); // 添加释放时间 } public void unlock(String key) { stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + key), ID_PREFIX + Thread.currentThread().getId()); String.valueOf(RELEASE_TIME_IN_MILLIS)); }
Redisson 组件
一、概述
定义
:Redis 基础上实现的分布式工具集合,一个 Java 驻内存数据网格(In-Memory Data Grid)功能
- 常用对象:提供分布式的 Java 常用对象
- 分布式锁:提供分布式服务 (包括各种分布式锁的实现)
- 并发安全:解决集群模式下的并发安全问题
特点
- 可重入:线程可以二次获取自己加的锁,防止线程调用自己产生的死锁
- 可重试:线程请求锁失败后,应该能再次请求锁
- 超时续期:超时释放存在安全隐患,lua表达式只能保证不误删,但是超时释放后实际上有两个线程在锁的逻辑内
- 主从一致:主从同步之前,如果主机宕机就会出现死锁问题
二、使用入门
-
引入依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.6</version> </dependency>
-
配置 Redission 客户端 (配置信息)
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ Config config = new Config(); config.useSingleServer() // 使用单节点模式连接 Redis .setAddress("redis://192.168.7.7:6379") .setPassword("123456"); return Redisson.create(config); // 创建RedissonClient对象 } }
-
在 ServiceImpl 中注入 RedissonClient 依赖
@AutoWired RedissonClient redissonClient;
四、锁重试 & WatchDog
锁重试机制
- 获取锁失败时可以设置重试等待时间
- 在等待时间内会周期性尝试获取锁
- 示例:
lock.tryLock(100, 10, TimeUnit.SECONDS)
- 等待100秒,每10秒重试一次
WatchDog 机制
- 默认锁有效期为 30 秒
- 获取锁成功后,会启动一个后台线程,定期延长锁的有效期
- 释放锁时,WatchDog 线程自动停止
五、multiLock 原理
-
概念
- MultiLock 可以将多个 RLock 对象关联为一个整体
- 只有所有的锁都获取成功,才算获取成功
-
使用场景
- Redis 主从架构下保证锁的可靠性
- 多个资源需要同时加锁的场景
-
示例代码
RLock lock1 = redissonClient1.getLock("lock1"); RLock lock2 = redissonClient2.getLock("lock2"); RLock multiLock = redissonClient.getMultiLock(lock1, lock2); try { // 同时获取多把锁 multiLock.lock(); // 业务逻辑 } finally { // 释放所有锁 multiLock.unlock(); }
业务逻辑
创建资源
抢占资源 (Lua 脚本)
一、优惠券下单逻辑
二、代码实现 (Lua脚本)
--1. 参数列表
--1.1. 优惠券id
local voucherId = ARGV[1]
--1.2. 用户id
local userId = ARGV[2]
--1.3. 订单id
local orderId = ARGV[3]
--2. 数据key
--2.1. 库存key
local stockKey = 'seckill:stock:' .. voucherId
--2.2. 订单key
local orderKey = 'seckill:order' .. voucherId
--3. 脚本业务
--3.1. 判断库存是否充足 get stockKey
if( tonumber( redis.call('get', stockKey) ) <= 0 ) then
return 1
end
--3.2. 判断用户是否下单 SISMEMBER orderKey userId
if( redis.call( 'sismember', orderKey, userId ) == 1 ) then
return 2
end
--3.4 扣库存: stockKey 的库存 -1
redis.call( 'incrby', stockKey, -1 )
--3.5 下单(保存用户): orderKey 集合中添加 userId
redis.call( 'sadd', orderKey, userId )
-- 3.6. 发送消息到队列中
redis.call( 'xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
三、加载 Lua 脚本
RedisScript 接口
:用于绑定一个具体的 Lua 脚本DefaultRedisScript 实现类
-
定义:RedisScript 接口的实现类
-
功能:提前加载 Lua 脚本
-
示例
// 创建Lua脚本对象 private static final DefaultRedisScript<Long> SECKILL_SCRIPT; // Lua脚本初始化 (通过静态代码块) static { SECKILL_SCRIPT = new DefaultRedisScript<>(); SECKILL_SCRIPT.setLocation(new ClassPathResource("/path/to/lua_script.lua")); SECKILL_SCRIPT.setResultType(Long.class); }
-
四、执行 Lua 脚本
调用Lua脚本 API
:StringRedisTemplate.execute( RedisScript<T> script, List<K> keys, Object… args )示例
-
执行 ”下单脚本” (此时不需要 key,因为下单时只需要用 userId 和 voucherId 查询是否有锁)
Long result = stringRedisTemplate.execute( SECKILL_SCRIPT, // 要执行的脚本 Collections.emptyList(), // KEY voucherId.toString(), userId.toString(), String.valueOf(orderId) // VALUES );
-
执行 “unlock脚本”
-
用户下单
-
代码逻辑
-
VoucherOrderServiceImpl 代码
@Override public Result seckillVoucher( Long voucherId ) { // 1. 获取用户id/订单id Long userId = UserHolder.getUser().getId(); long orderId = redisIdWorker.nextId("order"); // 2. 执行Lua脚本 Long luaResult = stringRedisTemplate.execute( SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString(), String.valueOf(orderId) ); // 3. 反向判断lua脚本是否成功下单 (下单失败提前退出) if( luaResult != 0L ) { return Result.fail( r == 1 ? "库存不足":"同一用户不能重复下单"); } // 4. 下单成功后的操作 // 4.1. 获取订单id long orderId = redisIdWorker.nextId("order"); // 4.2. 保存阻塞队列 return Result.ok(orderId); }
原文地址:https://blog.csdn.net/Shad0wLEO/article/details/144442164
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!