Redisson中红锁(RedLock)的实现
一、什么是红锁
当在单点redis中实现redis锁时,一旦redis服务器宕机,则无法进行锁操作。因此会考虑将redis配置为主从结 构,但在主从结构中,数据复制是异步实现的。假设在主从结构中,master会异步将数据复制到slave中,一旦某 个线程持有了锁,在还没有将数据复制到slave时,master宕机。则slave会被提升为master,但被提升为slave的 master中并没有之前线程的锁信息,那么其他线程则又可以重新加锁。
二、RedLock算法原理
redlock是一种基于多节点redis实现分布式锁的算法,可以有效解决redis单点故障的问题。官方建议搭建五台 redis服务器对redlock算法进行实现。
在redis官网中,对于redlock算法的实现思想也做了详细的介绍。
地址:https://redis.io/topics/distlock
整个实现过程分为五步:
- 1)记录获取锁前的当前时间
- 2)使用相同的key,value获取所有redis实例中的锁,并且设置获取锁的时间要远远小于锁自动释放的时间。假设 锁自动释放时间是10秒,则获取时间应在5-50毫秒之间。通过这种方式避免客户端长时间等待一个已经关闭的实 例,如果一个实例不可用了,则尝试获取下一个实例。
- 3)客户端通过获取所有实例的锁后的时间减去第一步的时间,得到的差值要小于锁自动释放时间,避免拿到一个 已经过期的锁。并且要有超过半数的redis实例成功获取到锁,才算最终获取锁成功。如果不是超过半数,有可能 出现多个客户端重复获取到锁,导致锁失效。
- 4)当已经获取到锁,那么它的真正失效时间应该为:过期时间-第三步的差值。
- 5)如果客户端获取锁失败,则在所有redis实例中释放掉锁。为了保证更高效的获取锁,还可以设置重试策略,在 一定时间后重新尝试获取锁,但不能是无休止的,要设置重试次数。
三、RedLock用法示例
1)新建配置类
@Configuration
public class RedissonRedLockConfig {
public RedissonRedLock initRedissonClient(String lockKey){
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://192.168.200.150:7000").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://192.168.200.150:7001").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://192.168.200.150:7002").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
Config config4 = new Config();
config4.useSingleServer().setAddress("redis://192.168.200.150:7003").setDatabase(0);
RedissonClient redissonClient4 = Redisson.create(config4);
Config config5 = new Config();
config5.useSingleServer().setAddress("redis://192.168.200.150:7004").setDatabase(0);
RedissonClient redissonClient5 = Redisson.create(config5);
RLock rLock1 = redissonClient1.getLock(lockKey);
RLock rLock2 = redissonClient2.getLock(lockKey);
RLock rLock3 = redissonClient3.getLock(lockKey);
RLock rLock4 = redissonClient4.getLock(lockKey);
RLock rLock5 = redissonClient5.getLock(lockKey);
RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1,rLock2,rLock3,rLock4,rLock5);
return redissonRedLock;
}
}
2)新建测试类,完成加锁与解锁操作
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedLockTest {
@Autowired
private RedissonRedLockConfig redissonRedLockConfig;
@Test
public void easyLock(){
//模拟多个10个客户端
for (int i=0;i<10;i++) {
Thread thread = new Thread(new RedLockTest.RedLockRunnable());
thread.start();
}
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private class RedLockRunnable implements Runnable {
@Override
public void run() {
RedissonRedLock redissonRedLock = redissonRedLockConfig.initRedissonClient("demo");
try {
boolean lockResult = redissonRedLock.tryLock(100, 10, TimeUnit.SECONDS);
if (lockResult){
System.out.println("获取锁成功");
TimeUnit.SECONDS.sleep(3);
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
redissonRedLock.unlock();
System.out.println("释放锁");
}
}
}
}
redissonRedLock加锁源码分析
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
/**
* 1. 允许加锁失败节点个数限制(N-(N/2+1)),当前假设五个节点,则允许失败节点数为2
*/
int failedLocksLimit = failedLocksLimit();
/**
* 2. 遍历所有节点执行lua加锁,用于保证原子性
*/
List<RLock> acquiredLocks = new ArrayList<>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
/**
* 3.对节点尝试加锁
*/
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisResponseTimeoutException e) {
// 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (Exception e) {
// 抛出异常表示获取锁失败
lockAcquired = false;
}
if (lockAcquired) {
/**
*4. 如果获取到锁则添加到已获取锁集合中
*/
acquiredLocks.add(lock);
} else {
/**
* 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
* 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
* 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
*/
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
/**
* 6.计算从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则申请锁失败,返回false
*/
if (remainTime != -1) {
remainTime -= System.currentTimeMillis() - time;
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
/**
* 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
*/
return true;
}
原文地址:https://blog.csdn.net/qq_46248151/article/details/145169861
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!