Redis实现分布式锁
分布式锁的三个核心要素
1、加锁
2、解锁
3、锁超时
一、加锁
setnx加锁、del 解锁、expire设置锁超时时间
使用 SETNX 、 EXPIRE 和 DEL 操作进行解决还有些问题
假设一个场景中,某一个线程刚执行setnx,成功得到了锁。此时setnx刚执行成功,还未来得及执行expire命令,节点就挂掉了。此时这把锁就没有设置过期时间,别的线程就再也无法获得该锁。
所以SETNX 和 EXPIRE 其实是非原子性的
解决措施:
setnx指令是不支持传入超时时间的,而在Redis2.6.12版本上为set指令增加了可选参数, 用法如下:
SET key value [EX seconds][PX milliseconds] [NX|XX]
-
EX second: 设置键的过期时间为second秒;
-
PX millisecond:设置键的过期时间为millisecond毫秒;
-
NX:只在键不存在时,才对键进行设置操作;
-
XX:只在键已经存在时,才对键进行设置操作;
-
SET操作完成时,返回OK,否则返回nil。
为什么SET是原子性的?
因为Redis是单线程的,所以一个指令只能执行失败或成功。刚刚是SETNX 和 EXPIRE 两个指令,所以一个成功另一个有可能失败。
举个例子:
SET myKey myValue EX 30 NX //如果键myKey不存在,这条命令将设置键的值,并在30秒后自动过期。如果键已经存在,命令将不会执行任何操作。
用Java怎么写?
import redis.clients.jedis.Jedis; public class RedisSetExample { public static void main(String[] args) { // 连接到Redis服务器 Jedis jedis = new Jedis("localhost", 6379); // 键名和值 String key = "myKey"; String value = "myValue"; // 设置键的过期时间为30秒,并且只在键不存在时设置 String result = jedis.set(key, value, "EX", 30, "NX"); if ("OK".equals(result)) { System.out.println("键设置成功,将在30秒后过期。"); } else { System.out.println("键已存在,未进行设置。"); } // 关闭连接 jedis.close(); } }
二、解锁(防止解除其他人加的锁)
如果线程 A 成功获取到了锁,并且设置了过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁;随后 A 执行完成,线程 A 使用 DEL 命令来释放锁,但此时线程 B 加的锁还没有执行完成,线程 A 实际释放的线程 B 加的锁。
解决办法:
在del释放锁之前加一个判断,验证当前的锁是不是自己加的锁。
1.具体在加锁的时候把当前线程的id当做value,可生成一个 UUID 标识当前线程,在删除之前验证key对应的value是不是自己线程的id。
2.使用 lua 脚本做验证标识和解锁操作。
举个例子:
加锁的过程很简单,就是通过SET指令来设置值,成功则返回;否则就循环等待,在timeout时间内仍未获取到锁,则获取失败。
import redis.clients.jedis.Jedis; import java.util.UUID; public class RedisLock { private Jedis jedis; private String lockKey; // 锁的键名 private String uuid; // 当前线程的UUID public RedisLock(Jedis jedis, String lockKey) { this.jedis = jedis; this.lockKey = lockKey; this.uuid = LockUtil.generateUUID(); } // 尝试获取锁 public boolean tryLock(int expireTimeInSeconds) { // 使用SET命令设置键的值和过期时间,并使用NX选项确保只有在键不存在时才设置 return jedis.set(lockKey, uuid, "NX", "EX", expireTimeInSeconds).equals("OK"); } // 释放锁 public void unlock() { // 使用Lua脚本来确保原子性地检查并删除锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Long result = (Long) jedis.eval(script, 1, lockKey, uuid); if (result == 1) { System.out.println("锁已释放。"); } else { System.out.println("尝试释放的锁不是当前线程的锁,或者锁已经被其他线程释放。"); } } }
/** 解锁我们通过jedis.eval来执行一段LUA就可以。将锁的Key键和生成的字符串当做参数传进来。 * 解锁 * @param id * @return */ public boolean unlock(String id){ Jedis jedis = jedisPool.getResource(); String script = "if redis.call('get',KEYS[1]) == ARGV[1] then" + " return redis.call('del',KEYS[1]) " + "else" + " return 0 " + "end"; try { Object result = jedis.eval(script, Collections.singletonList(lock_key), Collections.singletonList(id)); if("1".equals(result.toString())){ return true; } return false; }finally { jedis.close(); } }
三、超时解锁导致并发
如果线程 A 成功获取锁并设置过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁,线程 A 和线程 B 并发执行。
A、B 两个线程发生并发显然是不被允许的,一般有两种方式解决该问题:
-
将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成。
-
为获取锁的线程增加守护线程,为将要过期但未释放的锁增加有效时间。
四、不可重入
当线程在持有锁的情况下再次请求加锁,如果一个锁支持一个线程多次加锁,那么这个锁就是可重入的。如果一个不可重入锁被再次加锁,由于该锁已经被持有,再次加锁会失败。Redis 可通过对锁进行重入计数,加锁时加 1,解锁时减 1,当计数归 0 时释放锁。
五、无法等待锁释放
上述命令执行都是立即返回的,如果客户端可以等待锁释放就无法使用。
-
可以通过客户端轮询的方式解决该问题,当未获取到锁时,等待一段时间重新获取锁,直到成功获取锁或等待超时。这种方式比较消耗服务器资源,当并发量比较大时,会影响服务器的效率。
-
另一种方式是使用 Redis 的发布订阅功能,当获取锁失败时,订阅锁释放消息,获取锁成功后释放时,发送锁释放消息。 具体实现参考:https://xiaomi-info.github.io/2019/12/17/Redis-distributed-lock/
原文地址:https://blog.csdn.net/yuanhantin/article/details/140558680
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!