【RabbitMQ】死信(延迟队列)的使用
目录
一、介绍
1、什么是死信队列(延迟队列)
- 死信,在官网中对应的单词为“Dead Letter”,它是 RabbitMQ 的一种消息机制。
- 死信队列(Dead Letter Queue)和延迟队列(Delay Queue)是两种不同的队列类型,但在实际应用中它们可以结合使用。
- 死信队列是当消息在队列中因为过期、被拒绝等原因无法正常处理时,会被重新发送到另一个交换机上,这个交换机就是死信交换机。死信队列可以用于实现重试机制、日志审计等特殊应用逻辑。
延迟队列则是一种特殊的队列类型,它允许将消息延迟指定的时间后才能被消费者消费。这种队列通常用于处理那些需要在特定时间点被处理的任务,例如定时任务、限时优惠等。在RabbitMQ中,可以通过设置消息的TTL(生存时间)来实现延迟队列的功能。当消息在队列中超过了TTL,它就会被移除并被发送到指定的死信交换机,进而被路由到死信队列中。
结合使用死信队列和延迟队列可以实现一些复杂的应用逻辑。
例如:
可以将某个需要延迟处理的消息发送到延迟队列中,并在消息过期之前将其存储在死信队列中。这样,当消息从延迟队列中移除时,它会被自动发送到死信队列中,然后由消费者消费并执行相应的操作。这种结合使用的方式可以提供更高的灵活性和可靠性,使得系统能够更好地应对各种异常情况。
2、应用场景
死信队列(Dead Letter Queue)和延迟队列(Delay Queue)在以下应用场景中表现优异:
这些场景都是通过结合使用死信队列和延迟队列来提高系统的可靠性、鲁棒性和灵活性。通过合理地设置死信队列和延迟队列的参数,可以实现各种复杂的业务逻辑和异常处理机制。
- 库存解锁服务:例如,当一个商品被锁定且无法被购买时,可以将其放入死信队列中,并在一定时间后重新发送到原始队列进行处理,从而解锁库存。
- 定时关单功能:例如,用户在商城下单成功并点击去支付后,如果在指定时间内未支付,系统可以自动将订单放入死信队列中,并在一定时间后重新发送到原始队列进行处理。
- 保证订单业务的消息数据不丢失:当消息消费发生异常时,可以将消息投入死信队列中,以便后续等到环境好了之后,再消费死信队列中的消息。
- 实现重试机制:当某个消息处理失败时,可以将它放入死信队列中,并在一段时间后重新发送到交换机进行重试。这样可以提高系统的鲁棒性,确保消息能够被正确处理。
- 处理定时任务和秒杀活动:使用延迟队列可以将消息延迟到特定的时间后进行处理,例如定时任务、秒杀活动等。这样可以确保在特定的时间点执行相应的操作。
- 日志处理和审计:将日志消息发送到死信队列中,可以在日志发生异常时进行记录和审计,以便分析和排查问题。
3、死信队列(延迟队列)的使用
使用死信队列(Dead Letter Queue)和延迟队列(Delay Queue)可以提高系统的可靠性和灵活性,特别是在处理异常情况、重试机制和定时任务等方面。
需要注意的是,在使用死信队列和延迟队列时,需要考虑系统的可用性和性能。如果大量的消息被放入死信队列中,可能会导致系统资源的过度消耗。因此,需要合理配置死信队列和延迟队列的大小和数量,以及监控和管理系统的性能和资源使用情况。
- 定义死信队列和延迟队列:在RabbitMQ中,可以在定义队列时指定队列类型为死信队列或延迟队列。例如,使用RabbitMQ的命令行工具或管理界面创建死信队列或延迟队列。
- 配置交换机和队列属性:在绑定交换机和队列时,可以设置一些属性来控制消息的路由和消费。例如,可以设置消息的TTL(生存时间)来实现延迟队列的功能,或者设置消息的优先级和延迟时间等属性。
- 发送消息到队列:使用RabbitMQ的生产者将消息发送到定义的队列中。如果需要将消息放入延迟队列中,可以在发送消息时设置相应的延迟时间。
- 处理消息:消费者从队列中获取消息并进行处理。如果消息无法正常处理,可以将它放入死信队列中。在处理消息时,可以根据需要设置重试机制,以便在消息处理失败时重新发送消息到队列中进行重试。
- 监控和管理:使用RabbitMQ的管理界面或命令行工具监控和管理死信队列和延迟队列的状态和消息。可以查看队列的大小、消息的延迟时间、消费者数量等信息,并根据需要进行调整和管理。
4、死信消息来源
- 消息 TTL 过期
- 队列满了,无法再次添加数据
- 消息被拒绝(reject 或 nack),并且 requeue =false
二、案例实践
1、案例一
生产者
在生产者的Config里面添加队列 。
// =============死信队列===============
// 定义QueueA Bean,返回QueueA实例
@Bean
public Queue QueueA() {
Map<String, Object> config = new HashMap<>();
//message在该队列queue的存活时间最大为5秒
config.put("x-message-ttl", 5000);
//x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
config.put("x-dead-letter-exchange", "deadExchange");
//x-dead-letter-routing-key参数是给这个DLX指定路由键
config.put("x-dead-letter-routing-key", "deadQueue");
// 返回QueueA实例
return new Queue("QueueA", true, true, false, config);
}
// 定义DirectExchangeA Bean,返回DirectExchangeA实例
@Bean
public DirectExchange DirectExchangeA() {
// 返回DirectExchangeA实例
return new DirectExchange("DirectExchangeA");
}
// 定义bindingA Bean,将QueueA绑定到DirectExchangeA,并设置路由键为A.A
@Bean
public Binding bindingA() {
// 将QueueA绑定到DirectExchangeA
return BindingBuilder
// 设置路由键为A.A
.bind(QueueA())
.to(DirectExchangeA())
.with("A.A");
}
// 定义QueueB Bean,返回QueueB实例
@Bean
public Queue QueueB() {
// 返回QueueB实例
return new Queue("QueueB");
}
// 定义DirectExchangeB Bean,返回DirectExchangeB实例
@Bean
public DirectExchange DirectExchangeB() {
// 返回DirectExchangeB实例
return new DirectExchange("DirectExchangeB");
}
// 定义bindingB Bean,将QueueB绑定到DirectExchangeB,并设置路由键为B.B
@Bean
public Binding bindingB() {
// 将QueueB绑定到DirectExchangeB
return BindingBuilder
// 设置路由键为B.B
.bind(QueueB())
.to(DirectExchangeB())
.with("B.B");
}
消费者
在消费者里面添加QueueA和QueueB
QueueA
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueA")
public class ReceiverQA {
// 接收directExchange01交换机中Queue02队列消息的方法
@RabbitHandler
public void Queue02(String msg) {
log.warn("QueueA,接收到信息:" + msg);
}
}
QueueB
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@Slf4j
@RabbitListener(queues = "QueueB")
public class ReceiverQB {
// 接收directExchange01交换机中Queue02队列消息的方法
@RabbitHandler
public void Queue02(String msg) {
log.warn("QueueB,接收到信息:" + msg);
}
}
编写Controller层
@RequestMapping("send5")
public String send5() {
rabbitTemplate.convertAndSend("DirectExchangeA", "A.A", "Hello,A");
return "🐉";
}
结果:
2、案例二(消息接收确认 )
-
如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限
-
ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟
-
消息确认模式有:
-
AcknowledgeMode.NONE:自动确认
-
AcknowledgeMode.AUTO:根据情况确认
-
AcknowledgeMode.MANUAL:手动确认
-
配置文件
默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manualspring: rabbitmq: listener: simple: acknowledge-mode: manual
在消费者里的更改ReceiverQA
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.io.IOException; @Component @SuppressWarnings("all") @Slf4j @RabbitListener(queues = "QueueA") public class ReceiverQA { // 接收directExchange01交换机中Queue02队列消息的方法 @RabbitHandler public void QueueA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException { log.warn("QueueA,接收到信息:" + msg); channel.basicAck(tag, false); } }
需要注意的 basicAck 方法需要传递两个参数
deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
3、总结
-
持久化
-
exchange要持久化
-
queue要持久化
-
message要持久化
-
-
消息确认
-
启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)
-
生产者和Server(broker)之间的消息确认
-
消费者和Server(broker)之间的消息确认
-
原文地址:https://blog.csdn.net/weixin_74383330/article/details/135843431
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!