RabbitMQ--发送方确认及消息重试
(一)发送方确认
之前我们在七大工作模式中简单说了发送方确认,就是生产者到RabbitMQ这一过程中,消息是否正确到达服务器,生产者要进行确认的过程
一共有两种确认模式
1.confirm确认模式
是生产者到交换机的阶段,生产者进行确认的过程,之后还有return模式,是交换机到消息队列,确认的过程
生产者发送消息的时候,对发送端设置一个ConfirmCallback监听,无论消息是否到达交换机,这个监听都会执行,如果交换机收到了,就会ACK就会为true否则为false
然后我们来看代码
首先还是要更改配置文件
spring:
rabbitmq:
addresses: amqp://student:student@62.234.46.219:5672/test
listener:
simple:
# acknowledge-mode: NONE
# acknowledge-mode: AUTO
acknowledge-mode: MANUAL
publisher-confirm-type: correlated #消息发送确认
然后声明交换机和队列
@Bean("confirmExchange")
public Exchange confirmExchange(){
return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).durable(true).build();
}
@Bean("confirmQueue")
public Queue comfirmQueue(){
return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();
}
@Bean("confirmBind")
public Binding confirmBind(@Qualifier("confirmExchange") Exchange confrimExchange,@Qualifier("confirmQueue") Queue queue){
return BindingBuilder.bind(queue).to(confrimExchange).with("confirm").noargs();
}
然后我们来看生产者代码(有一点问题,错误示范)
public class AckProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/confirm")
public String confirmRabbit(){
// RabbitTemplate rabbitTemplate1=new RabbitTemplate();
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//回调方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行回调方法");
if(ack){
System.out.println("交换机成功接收到消息 id: "+correlationData.getId());
}else {
System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);
//业务处理
}
}
});
CorrelationData correlationData=new CorrelationData("1");
rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","confirm模式",correlationData);
return "发送成功";
}
}
之所以说是错误示范,我们可以先执行一下试试
我们发现也没什么错误啊,我们再来发送一遍
这时就会报错了,告诉我们一个RabbitTemplate只能支持一个回调方法
之所以会报这个错,是因为我们每次调用接口,都会给这个RabbitTemplate再设置一遍回调方法就会出错,
还有一个问题就是,我们既然是针对RabbitTemplate来设置的,那么所有使用此RabbitTemplate的接口,都会被影响,所以我们应该把他设置为多例的
那接下来我们就来看看怎么设置多个
@Configuration
public class Rabbit {
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate confirmRabbitTemplate =new RabbitTemplate(connectionFactory);
confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//回调方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行回调方法");
if(ack){
System.out.println("交换机成功接收到消息 id: "+correlationData.getId());
}else {
System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);
//业务处理
}
}
});
return confirmRabbitTemplate;
}
}
上面 的ConnectionFactory是我们配置文件写好后,自动给我们创建的,用来创建RabbitTemplate
那接下来我们使用这个新创建的ConfirmRabbitTemplate来测试下是否能解决我们的问题
我多次访问了接口,没有问题,那我们接口不能重复访问的问题就解决了,那能不能解决我们不同接口,被同时影响的问题?
我们发现还是不可以的,这是因为我们本身注入的是Spring框架给我们提供的,我们如果写了一个RabbitTemplate之后,spring就不会再给我们提供了,就会导致Autowired注解,通过类型查找,只能找到一个RabbitTemplate,就使用了,如果我们想解决这个问题,就需要我们自己再另外创建一个
@Bean
public RabbitTemplate RabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate RabbitTemplate =new RabbitTemplate(connectionFactory);
return RabbitTemplate;
}
这样我们再来重新试一下
此时我们第二种问题也正确了
此时我们再来看,如果我们故意放一个错误的routingkey
我们发现还是执行正常,但是我们的交换机根本映射不到队列中,还是会造成消息丢失,此时该如何处理,这是就轮到我们的return模式了
2.return退回模式
消息到达交换机,根据路由规则匹配并把消息放到队列中,在这个过程中,如果一个队列都无法匹配成功,就可以通过此模式把消息退回给生产者
代码跟confirm模式很像,而且同样也会有上面两种问题,所以这里不再说了,直接上代码
@Bean
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate confirmRabbitTemplate =new RabbitTemplate(connectionFactory);
confirmRabbitTemplate.setMandatory(true);
//也是一种回调函数
confirmRabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(returned.getMessage().getBody()+"被退回");
}
});
confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//回调方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("执行回调方法");
if(ack){
System.out.println("交换机成功接收到消息 id: "+correlationData.getId());
}else {
System.out.println("未接收到消息id: "+correlationData.getId()+"原因是: "+cause);
//业务处理
}
}
});
return confirmRabbitTemplate;
}
记得我们要设置SetMandory为(true)否则不执行
3.如何保证RabbitMQ的消息可靠传输(面试题)
我们一步步说
1)首先是生产者到交换机,此时可能会因为网络问题,导致生产者的消息发送到交换机失败,此时我们可以通过发送方确认中的confirm模式来解决
2)交换机无法找到对应队列,我们可以通过发送方确认中的return模式来处理返回消息
3)RabbitMQ服务宕机了,我们可以设置消息的持久化,队列持久化,交换机持久化,来保存消息到硬盘中,但是可能会有一些消息在缓冲区中还没有写入硬盘(这些消息就会丢失)
4)RabbitMQ到消费者过程中,可能因为消费者代码或者网络问题,造成消息丢失,此时可以用消息确认来解决(自动确认,手动确认)
(二)重试机制
在消息传递过程中,会遇到一些问题,可能会导致消费者消费消息的时候,处理失败,那对此RabbitMQ给我们提供了自动确认和手动确认,同时也给我们提供了重试机制,允许在消息处理失败后,重新发送
重试机制,在自动确认和手动确认的时候是没用的,所以我们需要设置为auto,当消息正常处理时就自动确认,当抛出异常时则不会确认消息
代码如下:
首先我们还是要更改配置
spring:
rabbitmq:
addresses: amqp://student:student@62.234.46.219:5672/test
listener:
simple:
# acknowledge-mode: NONE
acknowledge-mode: AUTO
# acknowledge-mode: MANUAL
retry:
enabled: true # 开启消费者失败重试
initial-interval: 5000ms # 初始失败等待时⻓为5秒
max-attempts: 5
publisher-confirm-type: correlated #消息发送确认
我们看到一共发送了五条,那我们发现这里的ID都是一样的,因为他们还是同一条消息, 如果我们是手动确认时出现了问题重新发送,他是先重新入队列再发送,他的ID是会递增的,而重试则不会
但是我们要注意,此时我们是没有捕获异常的,就让他往上抛才会触发重试,如果我们捕获了,就不会触发重试机制 同时我们刚刚配置最多是发五次,我们在发送时是unack状态
如果我们五次后还没有正确接收到消息,消息会自动确认就会丢失
原文地址:https://blog.csdn.net/huapiaoy/article/details/145185652
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!