自学内容网 自学内容网

项目中使用消息队列改进——基于RabbitMQ

使用 RabbitMQ 实现消息队列

  1. 导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--防止消息转换时的乱码-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
  1. 配置RabbitMQ
spring:
  rabbitmq:
    host: 192.168.72.100
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: zhl36 # 用户名
    password: zhl15737979065  # 密码
  1. 定义消息转换器,将 需要发送的消息对象 转成 json,否则在 RabbitMQ 的控制台获取的消息可能乱码
@Configuration
public class MessageConvert {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
  1. 编写生产者和消费者
@Service(value = "VoucherOrderServiceImplPlus")
public class VoucherOrderServiceImplPlus 
extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

@Autowired
    private ISeckillVoucherService seckillVoucherService;
    /**
     * 全局id生成器
     */
    @Autowired
    private RedisIdWorker redisIdWorker;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    // 使用 RabbitMQ 做消息队列
    @Autowired
    private RabbitTemplate rabbitTemplate;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
// 在类加载时加载lua脚本
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }
    
/**
     * 生产者处理消息
     * @param voucherOrder
     */
    public void handleVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        boolean isLock = lock.tryLock();
        if (!isLock){
            log.error("请勿重复下单");
        }
        try {
            rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,VOUCHERORDER_ROUTE_KEY,voucherOrder);
        } finally {
            lock.unlock();
        }
    }
    
@Override
    public Result secKillVoucher(Long voucherId) {
        /**
         * 执行lua脚本,判断返回值是否=0
         * ==0,可以正常下单
         * ==1,库存不足
         * ==2,用户重复下单
         */
        Long userId = UserHolder.getUser().getId();
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(),
                userId.toString());
        int r = result.intValue();
        if (r != 0) {
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        long orderId = redisIdWorker.nextId("order");

        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        // 把有购买资格的,把下单信息保存到 RabbitMQ
        handleVoucherOrder(voucherOrder);
        return Result.ok(orderId);
    }
    
/**
     * 消费者处理优惠券订单 
     * 交换机使用的是默认的 DIRECT 方式
     * @param voucherOrder
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("voucherOrder.queue1"),
            exchange = @Exchange(name = EXCHANGE_DIRECT, type = ExchangeTypes.DIRECT),
            key = {VOUCHERORDER_ROUTE_KEY}))
    @Override
    public void createVocherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        // 查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        if (count > 0) {
            log.error("您已购买过");
        }
        // 库存--
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1")
                // 实际上 stock > 0 是 CAS 以乐观锁的方式防止超卖  where voucher_id = ? and stock > 0 ?
                .eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
                .update();
        if (!success) {
            log.error("库存不足");
        }
        // 订单++
        save(voucherOrder);
    }

基于Redis实现消息队列

基于List 的消息队列

Redis 的List 数据结构是一个双向链表,很容易模拟出队列效果。

https://redis.io/docs/latest/commands/?group=list

可以使用 BLPOP 或 BRPOP 实现阻塞效果,当有消息时就处理并返回。BLPOP 结合 RPUSH 或 BRPOP 结合 LPUSH 来实现。
在这里插入图片描述

基于List的消息队列的优缺点

优点:

  • 利用Redis 存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
在这里插入图片描述

基于PubSub的消息队列的优缺点

优点:

  • 采用发布订阅模型,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

发送消息的命令:XADD
在这里插入图片描述读取消息的方式之一:XREAD
在这里插入图片描述

个人感觉太麻烦了,不如简单又好用的RabbitMQ😘


原文地址:https://blog.csdn.net/m0_72406127/article/details/137558655

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!