自学内容网 自学内容网

黑马程序员-redis项目实践笔记2

目录

三、

Redis实现全局唯一ID

实现优惠卷秒杀下单

超卖问题

一人一单(单例项目线程安全问题)

一人一单(集群环境下的并发问题)

分布式锁的基本原理和实现方式对比

Redis分布式锁

实现核心思路

实现代码

Redis分布式锁误删问题

业务阻塞造成锁超时释放问题

Lua脚本解决多条命令原子性问题

利用Java代码调用Lua脚本改造分布式锁

Redisson完善分布式锁

基本使用方式

Redisson锁的可重入原理

Redisson的锁重试和WatchDog机制

Redisson的MutiLock原理

小结


三、

Redis实现全局唯一ID

在各类购物App中,都会遇到商家发放的优惠券。当用户抢购商品时,生成的订单会保存到tb_voucher_order表中,而订单表如果使用数据库自增ID就会存在一些问题:id规律性太明显、受单表数据量的限制。如果我们的订单id有太明显的规律,那么对于用户或者竞争对手,就很容易猜测出我们的一些敏感信息,例如商城一天之内能卖出多少单,这明显不合适。

随着我们商城的规模越来越大,MySQL的单表容量不宜超过500W,数据量过大之后,我们就要进行拆库拆表,拆分表了之后,他们从逻辑上讲,是同一张表,所以他们的id不能重复,于是乎我们就要保证id的唯一性。那么这就引出我们的全局ID生成器了。

全局ID生成器是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足一下特性:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性

为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其他信息。

  • ID组成部分
    • 符号位:1bit,永远为0
    • 时间戳:31bit,以秒为单位,可以使用69年(2^31秒约等于69年)
    • 序列号:32bit,秒内的计数器,支持每秒传输2^32个不同ID

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
/**
 * 全局id生成器
 */
@Component
public class RedisIdWorker {
    /**
     * 开始时间戳 2023-1-1 00:00:00
     */
    private static final long BEGIN_TIMESTAMP = 1672531200L;

    /**
     * id生成器时间戳左移
     */
    private static final int COUNT_BITS = 32;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    public long nextId(String keyPrefix){
        //1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;

        //2.生成序列号
        //2.1获取当前日期,精确到天
        //key精确到天这样做是为了防止超过32位的最大值,每一天一个key,而且以后方便统计
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        //2.2自增长
        Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
        //3.拼接并返回
        return timestamp << COUNT_BITS | count;
    }
}

实现优惠卷秒杀下单

下单时需要判断两点:秒杀是否开始或结束,如果尚未开始或已经结束则无法下单。库存是否充足,不足则无法下单。

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.SeckillVoucherMapper;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

/**
 * <p>
 * 服务实现类
 * </p>
 */
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        //查询优惠卷信息
        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
        //判断活动是否开始
        if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
            //未开始直接返回错误
            return Result.fail("秒杀活动还未开始");
        }
        //判断活动是否结束
        if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
            //已结束直接返回错误
            return Result.fail("秒杀活动已经结束");
        }
        //开始,判断库存是否充足
        Integer stock = seckillVoucher.getStock();
        if (stock < 1) {
            //不足,直接返回
            return Result.fail("库存不足");
        }
        //充足,扣减库存
        LambdaUpdateWrapper<SeckillVoucher> uw = new LambdaUpdateWrapper<>();
        uw.set(SeckillVoucher::getStock, stock - 1);
        uw.eq(SeckillVoucher::getVoucherId,voucherId);
        boolean updateFlag = seckillVoucherService.update(uw);
        if(!updateFlag){
            //更新失败
            return Result.fail("库存不足");
        }
        //创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        long voucherOrderId = redisIdWorker.nextId("VoucherOrder");
        voucherOrder.setId(voucherOrderId);
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        //未支付
        voucherOrder.setStatus(1);
        boolean save = this.save(voucherOrder);
        return Result.ok(voucherOrderId);
    }
}

超卖问题

假设现在只剩下一张优惠券,线程1过来查询库存,判断库存数大于1,但还没来得及去扣减库存,此时库线程2也过来查询库存,发现库存数也大于1,那么这两个线程都会进行扣减库存操作,最终相当于是多个线程都进行了扣减库存,那么此时就会出现超卖问题。

超卖问题,因为是多线程并发执行,可能会存在超卖问题,因此需要给数据加锁。

超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁。而对于加锁,我们通常有两种解决方案:

(1)悲观锁:悲观锁可以实现对于数据的串行化执行,比如syn,和lock都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁等等。

(2)乐观锁

版本号法:乐观锁会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过。

CAS:当然乐观锁还有一些变种的处理方式比如CAS。这里并不需要真的来指定一下版本号,完全可以使用stock来充当版本号,在扣减库存时,比较查询到的优惠券库存和实际数据库中优惠券库存是否相同

只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败。

在这种场景,我们可以只判断是否有剩余优惠券,即只要数据库中的库存大于0,都能顺利完成扣减库存操作。

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.SeckillVoucherMapper;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

/**
 * <p>
 * 服务实现类
 * </p>
 */
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        //查询优惠卷信息
        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
        //判断活动是否开始
        if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
            //未开始直接返回错误
            return Result.fail("秒杀活动还未开始");
        }
        //判断活动是否结束
        if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
            //已结束直接返回错误
            return Result.fail("秒杀活动已经结束");
        }
        //开始,判断库存是否充足
        Integer stock = seckillVoucher.getStock();
        if (stock < 1) {
            //不足,直接返回
            return Result.fail("库存不足");
        }
        //充足,扣减库存
        LambdaUpdateWrapper<SeckillVoucher> uw = new LambdaUpdateWrapper<>();
        log.debug("stock - 1 ------> {}",stock - 1);
//        uw.set(SeckillVoucher::getStock, stock - 1);
        uw.setSql("stock = stock - 1");
        uw.eq(SeckillVoucher::getVoucherId,voucherId).gt(SeckillVoucher::getStock,0);
        boolean updateFlag = seckillVoucherService.update(uw);
        if(!updateFlag){
            //更新失败
            return Result.fail("库存不足");
        }
        //创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        long voucherOrderId = redisIdWorker.nextId("VoucherOrder");
        voucherOrder.setId(voucherOrderId);
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        //未支付
        voucherOrder.setStatus(1);
        boolean save = this.save(voucherOrder);
        return Result.ok(voucherOrderId);
    }
}

一人一单(单例项目线程安全问题)

需求:修改秒杀业务,要求同一个优惠券,一个用户只能抢一张

具体操作逻辑如下:我们在判断库存是否充足之后,根据我们保存的订单数据,判断用户订单是否已存在,如果已存在,则不能下单,返回错误信息。如果不存在,则继续下单,获取优惠券。

查询用户是否创建订单和判断订单是否存在这两块存在线程安全问题,需要加悲观锁

(1)细节一

synchronized不加载方法上,因为加载方法上锁的对象是this,this的范围太广了因此需要获取用户作为锁。

给从查询出订单到判断订单到新增订单这一段逻辑加锁,锁的粒度太粗了。在使用锁的过程中,控制锁粒度是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会被锁住。现在的情况就是所有用户都公用这一把锁,串行执行,效率很低,我们现在要完成的业务是一人一单,所以这个锁,应该只加在单个用户上,用户标识可以用userId。

(2)细节二:

userId是一个Long封装类型的对象,地址值会发生改变

由于toString的源码是new String,所以如果我们只用userId.toString()拿到的也不是同一个用户,需要使用intern(),如果字符串常量池中已经包含了一个等于这个string对象的字符串(由equals(object)方法确定),那么将返回池中的字符串。否则,将此String对象添加到池中,并返回对此String对象的引用。

(3)细节三:

synchronized不可以加载方法体里面,因为此方法有@Transactional事务进行管理。​​​​问题的原因在于当前方法被Spring的事务控制,如果你在内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放了,这样也会导致问题,所以我们选择将当前方法整体包裹起来,因此需要把锁在方法外调用,确保事务不会出现问题。

(4)细节四:

因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效。

所以这个地方,我们需要获得原始的事务对象, 来操作事务,这里可以使用aspectjweaver依赖,并且开启代理对象暴露,使用AopContext.currentProxy()来获取当前对象的代理对象,然后再用代理对象调用方法,记得要去IVoucherOrderService中创建createVoucherOrder方法。

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.SeckillVoucherMapper;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

/**
 * <p>
 * 服务实现类
 * </p>
 */
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    public Result seckillVoucher(Long voucherId) {
        //查询优惠卷信息
        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
        //判断活动是否开始
        if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
            //未开始直接返回错误
            return Result.fail("秒杀活动还未开始");
        }
        //判断活动是否结束
        if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
            //已结束直接返回错误
            return Result.fail("秒杀活动已经结束");
        }

        /*
            细节一:synchronized不加载方法上,因为加载方法上锁的对象是this,this的范围太广了,因此需要获取用户作为锁。
            细节二:userId是一个Long封装类型的对象,地址值会发生改变,toString 返回的是一个newString的对象,地址值会发生变化。以上两种锁的值都会发生变化,因此需要使用.intern()方法把字符串对象转换为字符串池里面的,这个就不会发生变化了。
            细节三:synchronized不可以加载方法体里面,因为此方法有@Transactional事务进行管理,加了事务最后会由spring做提交,这个时候有可能spring还没提交,但是锁释放了还是有线程安全问题,因此需要把锁在方法外调用。
         */
        Long userId = UserHolder.getUser().getId();
        synchronized (userId.toString().intern()) {
            /*
                细节四:如果直接调用createVoucherOrder方法事务会失效,因为这时候使用的是this.方法名是一个对象,而spring管理的都是代理对象,因此需要获取代理对象,使用代理对象调用方法,并且需要给接口加上这个方法。
             */
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            return proxy.createVoucherOrder(voucherId);
        }
    }

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        //一人一单
        Long userId = UserHolder.getUser().getId();
        //id值一样的一把锁
        //查询订单
        LambdaQueryWrapper<VoucherOrder> qw = new LambdaQueryWrapper<>();
        qw.eq(VoucherOrder::getUserId, userId);
        int count = count(qw);
        if (count > 0) {
            // 用户已经购买过
            log.debug("用户已经购买过一次!");
            return Result.fail("用户已经购买过一次!");
        }
        //充足,扣减库存
        LambdaUpdateWrapper<SeckillVoucher> uw = new LambdaUpdateWrapper<>();
//        uw.set(SeckillVoucher::getStock, stock - 1);
        uw.setSql("stock = stock - 1");
        uw.eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0);
        boolean updateFlag = seckillVoucherService.update(uw);
        if (!updateFlag) {
            //更新失败
            return Result.fail("库存不足");
        }
        //创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        long voucherOrderId = redisIdWorker.nextId("VoucherOrder");
        voucherOrder.setId(voucherOrderId);
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        //未支付
        voucherOrder.setStatus(1);
        boolean save = this.save(voucherOrder);
        return Result.ok(voucherOrderId);
    }
}

一人一单(集群环境下的并发问题)

通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。我们将服务启动两份,端口分别为8081和8082,然后修改nginx的config目录下的nginx.conf文件,配置反向代理和负载均衡(默认轮询就行)。

具体操作,我们使用POSTMAN发送两次请求,header携带同一用户的token,尝试用同一账号抢两张优惠券,发现是可行的。

失败原因分析:由于我们部署了多个Tomcat,每个Tomcat都有一个属于自己的jvm,那么假设在服务器A的Tomcat内部,有两个线程,即线程1和线程2,这两个线程使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的。但是如果在Tomcat的内部,又有两个线程,但是他们的锁对象虽然写的和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2互斥。

这就是集群环境下,syn锁失效的原因。在这种情况下,我们需要使用分布式锁来解决这个问题,让锁不存在于每个jvm的内部,而是让所有jvm公用外部的一把锁(Redis)

分布式锁的基本原理和实现方式对比

分布式锁是满足分布式系统或集群模式下多线程课件并且可以互斥的锁。

核心思想就是让大家共用同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路。

那么分布式锁应该满足一些什么条件呢?

  1. 可见性:多个线程都能看到相同的结果。注意:这里说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思。
  2. 互斥:互斥是分布式锁的最基本条件,使得程序串行执行。
  3. 高可用:程序不易崩溃,时时刻刻都保证较高的可用性。
  4. 高性能:由于加锁本身就让性能降低,所以对于分布式锁需要他较高的加锁性能和释放锁性能。
  5. 安全性:安全也是程序中必不可少的一环。

常见的分布式锁有三种,

  1. MySQL:本身就带有锁机制,但是由于MySQL的性能一般,所以使用MySQL作为分布式锁比较少见。
  2. Redis:作为分布式锁是非常常见的一种使用方式,利用SETNX这个方法,如果插入Key成功,则表示获得到了锁,如果有人插入成功,那么其他人就回插入失败,无法获取到锁,利用这套逻辑完成互斥,从而实现分布式锁。
  3. Zookeeper:也是企业级开发中较好的一种实现分布式锁的方案。

Redis分布式锁

实现核心思路

实现分布式锁时需要实现两个基本方法:

  1. 获取锁
    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
  2. 释放锁
    • 手动释放
    • 超时释放:获取锁的时候添加一个超时时间

核心思路:我们利用redis的SETNX方法,当有多个线程进入时,我们就利用该方法来获取锁。第一个线程进入时,redis 中就有这个key了则返回1。如果返回结果是1则表示抢到了锁去执行业务,然后再删除锁,退出锁逻辑。如果返回结果是0则没有抢到锁,等待一定时间之后重试。

实现代码

锁的基本接口,

package com.hmdp.utils;

public interface ILock {
    /**
     * 尝试获取锁
     * @param timeoutSec 锁持有的超时时间,过期后自动释放
     * @return true代表获取锁成功;false表示获取锁失败
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unlock();
}

然后创建一个SimpleRedisLock类实现接口,

public class SimpleRedisLock implements ILock {
    private static final String KEY_PREFIX = "lock:";  //锁的前缀
    private String name;  //具体业务名称,将前缀和业务名拼接之后当做Key
    private StringRedisTemplate stringRedisTemplate; //这里不是@Autowired注入,采用的是构造器注入,在创建SimpleRedisLock时,将RedisTemplate作为参数传入

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        //获取线程标识
        long threadId = Thread.currentThread().getId();
        //获取锁,使用SETNX方法进行加锁,同时设置过期时间,防止死锁
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success); //自动拆箱可能会出现null,这样写更稳妥
    }

    @Override
    public void unlock() {
        //通过DEL来删除锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

 修改业务代码,

package com.hmdp.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.SeckillVoucherMapper;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.SimpleRedisLock;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;

/**
 * <p>
 * 服务实现类
 * </p>
 */
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private RedisIdWorker redisIdWorker;

    @Override
    public Result seckillVoucher(Long voucherId) {
        //查询优惠卷信息
        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
        //判断活动是否开始
        if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
            //未开始直接返回错误
            return Result.fail("秒杀活动还未开始");
        }
        //判断活动是否结束
        if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
            //已结束直接返回错误
            return Result.fail("秒杀活动已经结束");
        }

        /*
            细节一:synchronized不加载方法上,因为加载方法上锁的对象是this,this的范围太广了。因此需要获取用户作为锁。
            细节二:userId是一个Long封装类型的对象,地址值会发生改变。toString 返回的是一个newString的对象,地址值会发生变化。以上两种锁的值都会发生变化。因此需要使用.intern()方法把字符串对象转换为字符串池里面的,这个就不会发生变化了
            细节三:synchronized不可以加载方法体里面,因为此方法有@Transactional事务进行管理。加了事务最后会由spring做提交,这个时候有可能spring还没提交,但是锁释放了还是有线程安全问题,因此需要把锁在方法外调用。
         */
        Long userId = UserHolder.getUser().getId();

        //创建Redis分布式锁对象
        SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
        //获取锁
        boolean isLock = simpleRedisLock.tryLock(5);
        if(!isLock){
            //获取锁失败,返回错误信息或重试
            //锁是用用户标识的,所以表示一个用户并发次数较多,所以直接返回失败
            return Result.fail("不允许重复下单!");
        }
        try {
            /*
                细节四:如果直接调用createVoucherOrder方法事务会失效,因为这时候使用的是this.方法名是一个对象,而spring管理的都是代理对象,因此需要获取代理对象,使用代理对象调用方法,并且需要给接口加上这个方法。
             */
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
        } finally {
            simpleRedisLock.unlock();
        }
    }

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
        //一人一单
        Long userId = UserHolder.getUser().getId();
        //id值一样的一把锁
        //查询订单
        LambdaQueryWrapper<VoucherOrder> qw = new LambdaQueryWrapper<>();
        qw.eq(VoucherOrder::getUserId, userId);
        int count = count(qw);
        if (count > 0) {
            // 用户已经购买过
            log.debug("用户已经购买过一次!");
            return Result.fail("用户已经购买过一次!");
        }
        //充足,扣减库存
        LambdaUpdateWrapper<SeckillVoucher> uw = new LambdaUpdateWrapper<>();
//        uw.set(SeckillVoucher::getStock, stock - 1);
        uw.setSql("stock = stock - 1");
        uw.eq(SeckillVoucher::getVoucherId, voucherId).gt(SeckillVoucher::getStock, 0);
        boolean updateFlag = seckillVoucherService.update(uw);
        if (!updateFlag) {
            //更新失败
            return Result.fail("库存不足");
        }
        //创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        long voucherOrderId = redisIdWorker.nextId("VoucherOrder");
        voucherOrder.setId(voucherOrderId);
        voucherOrder.setVoucherId(voucherId);
        voucherOrder.setUserId(UserHolder.getUser().getId());
        //未支付
        voucherOrder.setStatus(1);
        boolean save = this.save(voucherOrder);
        return Result.ok(voucherOrderId);
    }
}

Redis分布式锁误删问题

Redis分布式锁误删情况的逻辑说明:

  • 持有锁的线程1在锁的内部出现了阻塞,导致他的锁TTL到期,自动释放
  • 此时线程2也来尝试获取锁,由于线程1已经释放了锁,所以线程2可以拿到
  • 但是现在线程1阻塞完了,继续往下执行,要开始释放锁了
  • 那么此时就会将属于线程2的锁释放,这就是误删别人锁的情况

解决方案就是在每个线程存入锁的时候放入自己的线程标识,在删除锁的时候,判断当前这把锁是不是自己存入的,如果是则进行删除,如果不是则不进行删除

假设还是上面的情况,线程1阻塞,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1阻塞完了,继续往下执行,开始删除锁,但是线程1发现这把锁不是自己的,所以不进行删除锁的逻辑,当线程2执行到删除锁的逻辑时,如果TTL还未到期,则判断当前这把锁是自己的,于是删除这把锁。

private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
    // 获取线程标识
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁
    Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
    // 获取当前线程的标识
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁中的标识
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    // 判断标识是否一致
    if (threadId.equals(id)) {
        // 释放锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

业务阻塞造成锁超时释放问题

更为极端的误删逻辑说明:

  • 假设线程1已经获取了锁,在判断标识一致之后,准备释放锁的时候,又出现了阻塞(例如JVM垃圾回收机制),于是锁的TTL到期了,自动释放了。
  • 那么现在线程2趁虚而入,拿到了一把锁。
  • 但是线程1的逻辑还没执行完,那么线程1就会执行删除锁的逻辑,但是在阻塞前线程1已经判断了标识一致,所以现在线程1把线程2的锁给删了,那么就相当于判断标识那行代码没有起到作用。

这就是删锁时的原子性问题,因为线程1的拿锁,判断标识和删锁不是原子操作,所以我们要防止刚刚的情况。

Lua脚本解决多条命令原子性问题

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,可以使用Lua去操作Redis,而且还能保证它的原子性,这样拿锁,判断标识,删锁就可以实现是一个原子性动作了。

1)Redis提供的调用函数语法如下,

redis.call('命令名称','key','其他参数', ...)


## 先执行set name David
redis.call('set', 'name', 'David')
## 再执行get name
local name = redis.call('get', 'name')
## 返回
return name

2)写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下,3)如果脚本中的key和value不想写死,可以作为参数传递,key类型参数会放入KEYS数组,其他参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组中获取这些参数。

原逻辑:

@Override
public void unlock() {
    // 获取当前线程的标识
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁中的标识
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    // 判断标识是否一致
    if (threadId.equals(id)) {
        // 释放锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

改为,

-- 线程标识
local threadId = "UUID-31"
-- 锁的key
local key = "lock:order:userId"
-- 获取锁中线程标识
local id = redis.call('get', key)
-- 比较线程标识与锁的标识是否一致
if (threadId == id) then
    -- 一致则释放锁 del key
    return redis.call('del', key)
end
return 0



-- 这里的KEYS[1]就是传入锁的key
-- 这里的ARGV[1]就是线程标识
-- 比较锁中的线程标识与线程标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then
    -- 一致则释放锁
    return redis.call('del', KEYS[1])
end
return 0

利用Java代码调用Lua脚本改造分布式锁

在RedisTemplate中,可以利用execute方法去执行lua脚本,

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class SimpleRedisLock implements ILock {
    private StringRedisTemplate stringRedisTemplate;
    private String name;
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    private static final String KEY_PREFIX = "lock:";
    //糊涂包的randomUUID().toString可以更上true,可以去掉横线
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

    static {
        //加载lua脚本
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        //ClassPathResource类 默认去classpath下找
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        //获取线程表示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        //获取锁 setIfAbsent = setnx
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }


    @Override
    public void unlock() {
        // 调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId()
        );
    }
/*    @Override
    public void unlock() {
        //获取线程标识
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        //获取锁中的标识
        String lockId = stringRedisTemplate.opsForValue().get("KEY_PREFIX + name");
        //判断两者是否一致
        if(!threadId.equals(lockId)){
            //不一致,说明锁自动过期了,什么也不干
            return;
        }
        //释放锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }*/
}

Redisson完善分布式锁

基本使用方式

导入依赖,

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

配置Redisson客户端,在config包下新建RedissonConfig类,

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
            .setAddress("redis://101.XXX.XXX.160:6379")
            .setPassword("root");
        return Redisson.create(config);
    }
}

使用Redisson的分布式锁,

@Resource
private RedissonClient redissonClient;

@Test
void testRedisson() throws InterruptedException {
    //获取可重入锁
    RLock lock = redissonClient.getLock("anyLock");
    //尝试获取锁,三个参数分别是:获取锁的最大等待时间(期间会重试),锁的自动释放时间,时间单位
    boolean success = lock.tryLock(1,10, TimeUnit.SECONDS);
    //判断获取锁成功
    if (success) {
        try {
            System.out.println("执行业务");
        } finally {
            //释放锁
            lock.unlock();
        }
    }
}

然后,修改一人一单代码,

    @Override
    public Result seckillVoucher(Long voucherId) {
        //查询优惠卷信息
        SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
        //判断活动是否开始
        if (LocalDateTime.now().isBefore(seckillVoucher.getBeginTime())) {
            //未开始直接返回错误
            return Result.fail("秒杀活动还未开始");
        }
        //判断活动是否结束
        if (LocalDateTime.now().isAfter(seckillVoucher.getEndTime())) {
            //已结束直接返回错误
            return Result.fail("秒杀活动已经结束");
        }

        /*
            细节一:
                synchronized不加载方法上,因为加载方法上锁的对象是this,this的范围太广了
                因此需要获取用户作为锁
            细节二:
                userId是一个Long封装类型的对象,地址值会发生改变
                toString 返回的是一个newString的对象,地址值会发生变化
                以上两种锁的值都会发生变化
                因此需要使用.intern()方法把字符串对象转换为字符串池里面的,这个就不会发生变化了
            细节三:
                synchronized不可以加载方法体里面,因为此方法有@Transactional事务进行管理
                加了事务最后会由spring做提交,这个时候有可能spring还没提交,但是锁释放了还是有线程安全问题
                因此需要把锁在方法外调用
         */
        Long userId = UserHolder.getUser().getId();

        //创建Redis分布式锁对象
//        SimpleRedisLock simpleRedisLock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        //获取锁
//        boolean isLock = simpleRedisLock.tryLock(10);
        boolean isLock = lock.tryLock();
        if(!isLock){
            //获取锁失败,返回错误信息或重试
            //锁是用用户标识的,所以表示一个用户并发次数较多,所以直接返回失败
            return Result.fail("不允许重复下单!");
        }
        try {
            /*
                细节四:如果直接调用createVoucherOrder方法事务会失效
                ,因为这时候使用的是this.方法名是一个对象,而spring管理的都是代理对象,
                因此需要获取代理对象,使用代理对象调用方法,并且需要给接口加上这个方法
             */
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        return proxy.createVoucherOrder(voucherId);
        } finally {
            lock.unlock();
        }
    }

Redisson锁的可重入原理

Redisson可重入锁原理是使用hash存储到redis,在同一个线程多次获取锁就让锁计数器+1,释放锁的时候判断锁是否为0,为0说明线程中没有业务在执行了可以释放,如果不为0说明还有其他线程在执行,重新设置有效期后继续执行业务重复直到为0。

获取锁的逻辑

local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 锁不存在
if (redis.call('exists', key) == 0) then
    -- 获取锁并添加线程标识,state设为1
    redis.call('hset', key, threadId, '1');
    -- 设置锁有效期
    redis.call('expire', key, releaseTime);
    return 1; -- 返回结果
end;
-- 锁存在,判断threadId是否为自己
if (redis.call('hexists', key, threadId) == 1) then
    -- 锁存在,重入次数 +1,这里用的是hash结构的incrby增长
    redis.call('hincrby', key, thread, 1);
    -- 设置锁的有效期
    redis.call('expire', key, releaseTime);
    return 1; -- 返回结果
end;
return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败

释放锁的逻辑

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
-- 如果锁不是自己的
if (redis.call('HEXISTS', key, threadId) == 0) then
    return nil; -- 直接返回
end;
-- 锁是自己的,锁计数-1,还是用hincrby,不过自增长的值为-1
local count = redis.call('hincrby', key, threadId, -1);
-- 判断重入次数为多少
if (count > 0) then
    -- 大于0,重置有效期
    redis.call('expire', key, releaseTime);
    return nil;
else
    -- 否则直接释放锁
    redis.call('del', key);
    return nil;
end;

Redisson的锁重试和WatchDog机制

总结一下,Redisson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  • 超时续约:利用watchDog,每隔一段时间(releaseTime/3),重置超时时间 

Redisson的MutiLock原理

为了提高Redis的可用性,我们会搭建集群或者主从,现在以主从为例。

此时我们去写命令,写在主机上,主机会将数据同步给从机,但是假设主机还没来得及把数据写入到从机去的时候,主机宕机了。哨兵会发现主机宕机了,于是选举一个slave(从机)变成master(主机),而此时新的master(主机)上并没有锁的信息,那么其他线程就可以获取锁,又会引发安全问题。

为了解决这个问题。Redisson提出来了MutiLock锁。使用这把锁的话,那我们就不用主从了,每个节点的地位都是一样的,都可以当做是主机,那我们就需要将加锁的逻辑写入到每一个主从节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获取锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。

我们先使用虚拟机额外搭建两个Redis节点,

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://192.168.137.130:6379")
                .setPassword("root");
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://92.168.137.131:6379")
                .setPassword("root");
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient3() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://92.168.137.132:6379")
                .setPassword("root");
        return Redisson.create(config);
    }
}

首先要注入三个RedissonClient对象,然后使用联锁,

@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;

private RLock lock;

@BeforeEach
void setUp() {
    RLock lock1 = redissonClient.getLock("lock");
    RLock lock2 = redissonClient2.getLock("lock");
    RLock lock3 = redissonClient3.getLock("lock");
    // 创建联锁
    lock = redissonClient.getMultiLock(lock1, lock2, lock3);
}

@Test
void method1() {
    boolean success = lock.tryLock();
    redissonClient.getMultiLock();
    if (!success) {
        log.error("获取锁失败,1");
        return;
    }
    try {
        log.info("获取锁成功");
        method2();
    } finally {
        log.info("释放锁,1");
        lock.unlock();
    }
}

void method2() {
    RLock lock = redissonClient.getLock("lock");
    boolean success = lock.tryLock();
    if (!success) {
        log.error("获取锁失败,2");
        return;
    }
    try {
        log.info("获取锁成功,2");
    } finally {
        log.info("释放锁,2");
        lock.unlock();
    }
}

在源码中,当我们没有传入锁对象来创建联锁的时候,则会抛出一个异常,反之则将我们传入的可变参数锁对象封装成一个集合,

public RedissonMultiLock(RLock... locks) {
    if (locks.length == 0) {
        throw new IllegalArgumentException("Lock objects are not defined");
    } else {
        this.locks.addAll(Arrays.asList(locks));
    }
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1L;
    //如果传入了释放时间
    if (leaseTime != -1L) {
        //再判断一下是否有等待时间
        if (waitTime == -1L) {
            //如果没传等待时间,不重试,则只获得一次
            newLeaseTime = unit.toMillis(leaseTime);
        } else {
            //想要重试,耗时较久,万一释放时间小于等待时间,则会有问题,所以这里将等待时间乘以二
            newLeaseTime = unit.toMillis(waitTime) * 2L;
        }
    }
    //获取当前时间
    long time = System.currentTimeMillis();
    //剩余等待时间
    long remainTime = -1L;
    if (waitTime != -1L) {
        remainTime = unit.toMillis(waitTime);
    }
    //锁等待时间,与剩余等待时间一样    
    long lockWaitTime = this.calcLockWaitTime(remainTime);
    //锁失败的限制,源码返回是的0
    int failedLocksLimit = this.failedLocksLimit();
    //已经获取成功的锁
    List<RLock> acquiredLocks = new ArrayList(this.locks.size());
    //迭代器,用于遍历
    ListIterator<RLock> iterator = this.locks.listIterator();

    while(iterator.hasNext()) {
        RLock lock = (RLock)iterator.next();

        boolean lockAcquired;
        try {
            //没有等待时间和释放时间,调用空参的tryLock
            if (waitTime == -1L && leaseTime == -1L) {
                lockAcquired = lock.tryLock();
            } else {
                //否则调用带参的tryLock
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException var21) {
            this.unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception var22) {
            lockAcquired = false;
        }
        //判断获取锁是否成功
        if (lockAcquired) {
            //成功则将锁放入成功锁的集合
            acquiredLocks.add(lock);
        } else {
            //如果获取锁失败
            //判断当前锁的数量,减去成功获取锁的数量,如果为0,则所有锁都成功获取,跳出循环
            if (this.locks.size() - acquiredLocks.size() == this.failedLocksLimit()) {
                break;
            }
            //否则将拿到的锁都释放掉
            if (failedLocksLimit == 0) {
                this.unlockInner(acquiredLocks);
                //如果等待时间为-1,则不想重试,直接返回false
                if (waitTime == -1L) {
                    return false;
                }

                failedLocksLimit = this.failedLocksLimit();
                //将已经拿到的锁都清空
                acquiredLocks.clear();
                //将迭代器往前迭代,相当于重置指针,放到第一个然后重试获取锁
                while(iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                --failedLocksLimit;
            }
        }
        //如果剩余时间不为-1,很充足
        if (remainTime != -1L) {
            //计算现在剩余时间
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            //如果剩余时间为负数,则获取锁超时了
            if (remainTime <= 0L) {
                //将之前已经获取到的锁释放掉,并返回false
                this.unlockInner(acquiredLocks);
                //联锁成功的条件是:每一把锁都必须成功获取,一把锁失败,则都失败
                return false;
            }
        }
    }
    //如果设置了锁的有效期
    if (leaseTime != -1L) {
        List<RFuture<Boolean>> futures = new ArrayList(acquiredLocks.size());
        //迭代器用于遍历已经获取成功的锁
        Iterator var24 = acquiredLocks.iterator();

        while(var24.hasNext()) {
            RLock rLock = (RLock)var24.next();
            //设置每一把锁的有效期
            RFuture<Boolean> future = ((RedissonLock)rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }

        var24 = futures.iterator();

        while(var24.hasNext()) {
            RFuture<Boolean> rFuture = (RFuture)var24.next();
            rFuture.syncUninterruptibly();
        }
    }
    //但如果没设置有效期,则会触发WatchDog机制,自动帮我们设置有效期,所以大多数情况下,我们不需要自己设置有效期
    return true;
}

小结

  1. 不可重入Redis分布式锁
    • 原理:利用SETNX的互斥性;利用EX避免死锁;释放锁时判断线程标识
    • 缺陷:不可重入、无法重试、锁超时失效
  2. 可重入Redis分布式锁
    • 原理:利用Hash结构,记录线程标识与重入次数;利用WatchDog延续锁时间;利用信号量控制锁重试等待
    • 缺陷:Redis宕机引起锁失效问题
  3. Redisson的multiLock
    • 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功

原文地址:https://blog.csdn.net/qq_45956730/article/details/142856129

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!