自学内容网 自学内容网

redis实现基础分布式锁,自动续期,可重入分布式锁

1.基础分布式锁实现

1.1代码如下:
@Slf4j
@Component
public class RedisDistributedLock {

    @Autowired
    private StringRedisTemplate redisTemplate;

    private static final long DEFAULT_EXPIRE = 30000L; // 30秒
    private static final long WAIT_INTERVAL = 100L; // 100毫秒
    
    /**
     * 获取锁
     */
    public boolean lock(String key, String value, long expireTime) {
        try {
            // 使用SET命令with NX and PX options
            return Boolean.TRUE.equals(redisTemplate.opsForValue()
                .setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS));
        } catch (Exception e) {
            log.error("获取锁失败", e);
            return false;
        }
    }
    
    /**
     * 释放锁
     */
    public boolean unlock(String key, String value) {
        try {
            // 使用Lua脚本确保原子性
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                          "return redis.call('del', KEYS[1]) " +
                          "else return 0 end";
            
            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
            redisScript.setScriptText(script);
            redisScript.setResultType(Long.class);
            
            Long result = redisTemplate.execute(redisScript, 
                Collections.singletonList(key), value);
            return result != null && result == 1L;
        } catch (Exception e) {
            log.error("释放锁失败", e);
            return false;
        }
    }
}

2.进阶版分布式锁(支持自动续期)

2.1代码如下
@Slf4j
@Component
public class AdvancedRedisLock {

    @Autowired
    private StringRedisTemplate redisTemplate;
    
    private static final long DEFAULT_EXPIRE = 30000L;
    private static final long RENEWAL_INTERVAL = 10000L; // 续期间隔
    private final Map<String, ScheduledFuture<?>> renewalTasks = new ConcurrentHashMap<>();
    
    @Autowired
    private ScheduledExecutorService scheduler;
    
    /**
     * 获取锁并自动续期
     */
    public boolean lockWithRenewal(String key, String value, long expireTime) {
        try {
            boolean locked = lock(key, value, expireTime);
            if (locked) {
                // 启动续期任务
                scheduleRenewal(key, value, expireTime);
            }
            return locked;
        } catch (Exception e) {
            log.error("获取锁失败", e);
            return false;
        }
    }
    
    /**
     * 释放锁并取消续期
     */
    public boolean unlockWithRenewal(String key, String value) {
        try {
            // 取消续期任务
            cancelRenewal(key);
            return unlock(key, value);
        } catch (Exception e) {
            log.error("释放锁失败", e);
            return false;
        }
    }
    
    /**
     * 调度续期任务
     */
    private void scheduleRenewal(String key, String value, long expireTime) {
        ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
            try {
                String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
                              "return redis.call('pexpire', KEYS[1], ARGV[2]) " +
                              "else return 0 end";
                
                DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
                redisScript.setScriptText(script);
                redisScript.setResultType(Long.class);
                
                Long result = redisTemplate.execute(redisScript,
                    Collections.singletonList(key),
                    value, String.valueOf(expireTime));
                
                if (result == null || result != 1L) {
                    cancelRenewal(key);
                }
            } catch (Exception e) {
                log.error("续期失败", e);
                cancelRenewal(key);
            }
        }, RENEWAL_INTERVAL / 3, RENEWAL_INTERVAL / 3, TimeUnit.MILLISECONDS);
        
        renewalTasks.put(key, future);
    }
    
    /**
     * 取消续期任务
     */
    private void cancelRenewal(String key) {
        ScheduledFuture<?> future = renewalTasks.remove(key);
        if (future != null) {
            future.cancel(false);
        }
    }
}

3.使用装饰器模式实现可重入锁

3.1代码如下
@Slf4j
@Component
public class ReentrantRedisLock {

    private final ThreadLocal<Map<String, Integer>> locksHolder = 
        ThreadLocal.withInitial(HashMap::new);
    
    @Autowired
    private AdvancedRedisLock redisLock;
    
    /**
     * 获取可重入锁
     */
    public boolean lock(String key, String value, long expireTime) {
        Map<String, Integer> counts = locksHolder.get();
        Integer count = counts.get(key);
        
        if (count != null && count > 0) {
            // 已经持有锁,计数加1
            counts.put(key, count + 1);
            return true;
        }
        
        // 首次获取锁
        if (redisLock.lockWithRenewal(key, value, expireTime)) {
            counts.put(key, 1);
            return true;
        }
        return false;
    }
    
    /**
     * 释放可重入锁
     */
    public boolean unlock(String key, String value) {
        Map<String, Integer> counts = locksHolder.get();
        Integer count = counts.get(key);
        
        if (count == null) {
            return false;
        }
        
        count--;
        if (count > 0) {
            // 还有重入次数,更新计数
            counts.put(key, count);
            return true;
        }
        
        // 释放锁
        counts.remove(key);
        return redisLock.unlockWithRenewal(key, value);
    }
}

4. 使用AOP实现分布式锁注解

4.1代码如下:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {
    String key();
    long expireTime() default 30000L;
    long waitTime() default 5000L;
}

@Aspect
@Component
@Slf4j
public class DistributedLockAspect {

    @Autowired
    private ReentrantRedisLock redisLock;
    
    @Around("@annotation(distributedLock)")
    public Object around(ProceedingJoinPoint point, DistributedLock distributedLock) 
            throws Throwable {
        String key = distributedLock.key();
        String value = UUID.randomUUID().toString();
        
        // 尝试获取锁
        long waitTime = distributedLock.waitTime();
        long startTime = System.currentTimeMillis();
        
        while (!redisLock.lock(key, value, distributedLock.expireTime())) {
            if (System.currentTimeMillis() - startTime > waitTime) {
                throw new RuntimeException("获取锁超时");
            }
            Thread.sleep(100);
        }
        
        try {
            return point.proceed();
        } finally {
            redisLock.unlock(key, value);
        }
    }
}

5.使用示例

5.1代码如下
@Service
@Slf4j
public class OrderService {

    @Autowired
    private ReentrantRedisLock redisLock;
    
    @DistributedLock(key = "'order:' + #orderId", expireTime = 30000)
    public void processOrder(String orderId) {
        try {
            // 处理订单逻辑
            log.info("Processing order: {}", orderId);
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public void manualLockExample(String orderId) {
        String key = "order:" + orderId;
        String value = UUID.randomUUID().toString();
        
        try {
            if (redisLock.lock(key, value, 30000)) {
                // 处理订单逻辑
                log.info("Processing order: {}", orderId);
            } else {
                log.warn("Failed to acquire lock for order: {}", orderId);
            }
        } finally {
            redisLock.unlock(key, value);
        }
    }
}

6.配置如下

6.1代码如下
@Configuration
public class RedisLockConfig {

    @Bean
    public ScheduledExecutorService scheduledExecutorService() {
        return Executors.newScheduledThreadPool(
            Runtime.getRuntime().availableProcessors(),
            new ThreadFactoryBuilder()
                .setNameFormat("redis-lock-renewal-%d")
                .setDaemon(true)
                .build()
        );
    }
}

原文地址:https://blog.csdn.net/weixin_45069056/article/details/144307546

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!