项目中使用消息队列改进——基于RabbitMQ
使用 RabbitMQ 实现消息队列
- 导入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--防止消息转换时的乱码-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- 配置RabbitMQ
spring:
rabbitmq:
host: 192.168.72.100
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: zhl36 # 用户名
password: zhl15737979065 # 密码
- 定义消息转换器,将 需要发送的消息对象 转成 json,否则在 RabbitMQ 的控制台获取的消息可能乱码
@Configuration
public class MessageConvert {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
- 编写生产者和消费者
@Service(value = "VoucherOrderServiceImplPlus")
public class VoucherOrderServiceImplPlus
extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
/**
* 全局id生成器
*/
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private RedissonClient redissonClient;
// 使用 RabbitMQ 做消息队列
@Autowired
private RabbitTemplate rabbitTemplate;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
// 在类加载时加载lua脚本
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
/**
* 生产者处理消息
* @param voucherOrder
*/
public void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if (!isLock){
log.error("请勿重复下单");
}
try {
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,VOUCHERORDER_ROUTE_KEY,voucherOrder);
} finally {
lock.unlock();
}
}
@Override
public Result secKillVoucher(Long voucherId) {
/**
* 执行lua脚本,判断返回值是否=0
* ==0,可以正常下单
* ==1,库存不足
* ==2,用户重复下单
*/
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString());
int r = result.intValue();
if (r != 0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
// 把有购买资格的,把下单信息保存到 RabbitMQ
handleVoucherOrder(voucherOrder);
return Result.ok(orderId);
}
/**
* 消费者处理优惠券订单
* 交换机使用的是默认的 DIRECT 方式
* @param voucherOrder
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("voucherOrder.queue1"),
exchange = @Exchange(name = EXCHANGE_DIRECT, type = ExchangeTypes.DIRECT),
key = {VOUCHERORDER_ROUTE_KEY}))
@Override
public void createVocherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
log.error("您已购买过");
}
// 库存--
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
// 实际上 stock > 0 是 CAS 以乐观锁的方式防止超卖 where voucher_id = ? and stock > 0 ?
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
.update();
if (!success) {
log.error("库存不足");
}
// 订单++
save(voucherOrder);
}
基于Redis实现消息队列
基于List 的消息队列
Redis 的List 数据结构是一个双向链表,很容易模拟出队列效果。
可以使用 BLPOP 或 BRPOP 实现阻塞效果,当有消息时就处理并返回。BLPOP 结合 RPUSH 或 BRPOP 结合 LPUSH 来实现。
基于List的消息队列的优缺点
优点:
- 利用Redis 存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
基于PubSub的消息队列的优缺点
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
发送消息的命令:XADD
读取消息的方式之一:XREAD
个人感觉太麻烦了,不如简单又好用的RabbitMQ😘
原文地址:https://blog.csdn.net/m0_72406127/article/details/137558655
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!