redis实现基础分布式锁,自动续期,可重入分布式锁
1.基础分布式锁实现
1.1代码如下:
@Slf4j
@Component
public class RedisDistributedLock {
@Autowired
private StringRedisTemplate redisTemplate;
private static final long DEFAULT_EXPIRE = 30000L; // 30秒
private static final long WAIT_INTERVAL = 100L; // 100毫秒
/**
* 获取锁
*/
public boolean lock(String key, String value, long expireTime) {
try {
// 使用SET命令with NX and PX options
return Boolean.TRUE.equals(redisTemplate.opsForValue()
.setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS));
} catch (Exception e) {
log.error("获取锁失败", e);
return false;
}
}
/**
* 释放锁
*/
public boolean unlock(String key, String value) {
try {
// 使用Lua脚本确保原子性
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(key), value);
return result != null && result == 1L;
} catch (Exception e) {
log.error("释放锁失败", e);
return false;
}
}
}
2.进阶版分布式锁(支持自动续期)
2.1代码如下
@Slf4j
@Component
public class AdvancedRedisLock {
@Autowired
private StringRedisTemplate redisTemplate;
private static final long DEFAULT_EXPIRE = 30000L;
private static final long RENEWAL_INTERVAL = 10000L; // 续期间隔
private final Map<String, ScheduledFuture<?>> renewalTasks = new ConcurrentHashMap<>();
@Autowired
private ScheduledExecutorService scheduler;
/**
* 获取锁并自动续期
*/
public boolean lockWithRenewal(String key, String value, long expireTime) {
try {
boolean locked = lock(key, value, expireTime);
if (locked) {
// 启动续期任务
scheduleRenewal(key, value, expireTime);
}
return locked;
} catch (Exception e) {
log.error("获取锁失败", e);
return false;
}
}
/**
* 释放锁并取消续期
*/
public boolean unlockWithRenewal(String key, String value) {
try {
// 取消续期任务
cancelRenewal(key);
return unlock(key, value);
} catch (Exception e) {
log.error("释放锁失败", e);
return false;
}
}
/**
* 调度续期任务
*/
private void scheduleRenewal(String key, String value, long expireTime) {
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
try {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('pexpire', KEYS[1], ARGV[2]) " +
"else return 0 end";
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
redisScript.setResultType(Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(key),
value, String.valueOf(expireTime));
if (result == null || result != 1L) {
cancelRenewal(key);
}
} catch (Exception e) {
log.error("续期失败", e);
cancelRenewal(key);
}
}, RENEWAL_INTERVAL / 3, RENEWAL_INTERVAL / 3, TimeUnit.MILLISECONDS);
renewalTasks.put(key, future);
}
/**
* 取消续期任务
*/
private void cancelRenewal(String key) {
ScheduledFuture<?> future = renewalTasks.remove(key);
if (future != null) {
future.cancel(false);
}
}
}
3.使用装饰器模式实现可重入锁
3.1代码如下
@Slf4j
@Component
public class ReentrantRedisLock {
private final ThreadLocal<Map<String, Integer>> locksHolder =
ThreadLocal.withInitial(HashMap::new);
@Autowired
private AdvancedRedisLock redisLock;
/**
* 获取可重入锁
*/
public boolean lock(String key, String value, long expireTime) {
Map<String, Integer> counts = locksHolder.get();
Integer count = counts.get(key);
if (count != null && count > 0) {
// 已经持有锁,计数加1
counts.put(key, count + 1);
return true;
}
// 首次获取锁
if (redisLock.lockWithRenewal(key, value, expireTime)) {
counts.put(key, 1);
return true;
}
return false;
}
/**
* 释放可重入锁
*/
public boolean unlock(String key, String value) {
Map<String, Integer> counts = locksHolder.get();
Integer count = counts.get(key);
if (count == null) {
return false;
}
count--;
if (count > 0) {
// 还有重入次数,更新计数
counts.put(key, count);
return true;
}
// 释放锁
counts.remove(key);
return redisLock.unlockWithRenewal(key, value);
}
}
4. 使用AOP实现分布式锁注解
4.1代码如下:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
String key();
long expireTime() default 30000L;
long waitTime() default 5000L;
}
@Aspect
@Component
@Slf4j
public class DistributedLockAspect {
@Autowired
private ReentrantRedisLock redisLock;
@Around("@annotation(distributedLock)")
public Object around(ProceedingJoinPoint point, DistributedLock distributedLock)
throws Throwable {
String key = distributedLock.key();
String value = UUID.randomUUID().toString();
// 尝试获取锁
long waitTime = distributedLock.waitTime();
long startTime = System.currentTimeMillis();
while (!redisLock.lock(key, value, distributedLock.expireTime())) {
if (System.currentTimeMillis() - startTime > waitTime) {
throw new RuntimeException("获取锁超时");
}
Thread.sleep(100);
}
try {
return point.proceed();
} finally {
redisLock.unlock(key, value);
}
}
}
5.使用示例
5.1代码如下
@Service
@Slf4j
public class OrderService {
@Autowired
private ReentrantRedisLock redisLock;
@DistributedLock(key = "'order:' + #orderId", expireTime = 30000)
public void processOrder(String orderId) {
try {
// 处理订单逻辑
log.info("Processing order: {}", orderId);
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public void manualLockExample(String orderId) {
String key = "order:" + orderId;
String value = UUID.randomUUID().toString();
try {
if (redisLock.lock(key, value, 30000)) {
// 处理订单逻辑
log.info("Processing order: {}", orderId);
} else {
log.warn("Failed to acquire lock for order: {}", orderId);
}
} finally {
redisLock.unlock(key, value);
}
}
}
6.配置如下
6.1代码如下
@Configuration
public class RedisLockConfig {
@Bean
public ScheduledExecutorService scheduledExecutorService() {
return Executors.newScheduledThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder()
.setNameFormat("redis-lock-renewal-%d")
.setDaemon(true)
.build()
);
}
}
原文地址:https://blog.csdn.net/weixin_45069056/article/details/144307546
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!