RabbitMQ 高级特性——发送方确认
前言
前面我们学习了 RabbitMQ 中交换机、队列和消息的持久化,这样能够保证存储在 RabbitMQ Broker 中的交换机和队列中的消息实现持久化,就算 RabbitMQ 服务发生了重启或者是宕机,也不会导致交换机和消息的丢失。那么这个机制是保证存储在 RabbitMQ Broker 中的可靠性,但是对于生产者发送的消息如果都到达不了 RabbitMQ 的话,那么这些持久化操作也就没有意义了,那么对于生产者发送的消息,生产者如何知道消息是否已经成功到达 RabbitMQ Broker 了呢?这里就需要用到 RabbitMQ 发送发确认这个特性了,前面我们大概的讲了一下 RabbitMQ Java Client 中的 Publisher/confirm 发送方确认,那么这篇文章我们将学习在 SpringBoot 中如何实现发送方确认。
发送方确认
其实对于上面的问题,RabbitMQ 为我们提供了两种解决方案:
- 通过事务机制实现
- 通过发送方确认机制实现
因为使用事务机制的话比较消耗性能,在实际工作中使用的不多,所以我们就主要介绍发送方确认的机制来实现发送方的确认。并且对于发送方确认的机制 RabbitMQ 也为我们提供了两个方式来控制消息的可靠性。
- confirm 确认模式
- return 退回模式
confirm 模式是确认消息是否到达指定的 Exchange 交换机的,而 return 退回模式则是确认消息是否到达指定队列的。
confirm 确认模式
Producer 在发送消息的时候,对发送端设置一个 ConfirmCallback 的监听,无论消息是否到达 Exchange,这个监听都会被执行,如果 Exchange 成功收到,ACK (Ackonwledge character 确认字符)为 true,如果没有收到消息,ACK 就为 false。
那么下面我们就来看看在 SpringBoot 中如何实现 confirm 确认模式:
首先在配置文件中配置信息:
spring:
rabbitmq:
publisher-confirm-type: correlated #消息发送确认
然后设置确认回调函数的内容并且发送消息:
无论消息是否成功送到,都会执行这个回调函数,确认消息是否成功送达的判断依据就是 ACK 的值:
public class Constants {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE = "confirm.queue";
}
@Configuration
public class RabbitConfig {
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();
}
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();
}
@Bean("confirmBinding")
public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("confirm");
}
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
//创建新的RabbitTemplate对象,并且设置confirm回调函数
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.printf("消息接收成功,id:%s\n",correlationData.getId());
}else {
System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);
}
}
});
return rabbitTemplate;
}
}
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/confirm")
public String confirm() {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm","rabbitmq confirm",correlationData);
return "消息确认成功";
}
}
然后我们指定交换机的时候,如果指定一个不存在的交换机,也就是消息无法到达指定的交换机,那么看看时候会执行确认回调函数:
rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE + 1,"confirm","rabbitmq confirm",correlationData);
2024-08-13 14:47:52.646 ERROR 11252 — [3.57.1.114:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)
消息接受失败,id:1,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange ‘confirm.exchange1’ in vhost ‘test’, class-id=60, method-id=40)
可以看到,如果消息没有到达指定的交换机,那么也是会执行相应的回调函数的。
public interface ConfirmCallback {
/**
* 确认回调
* @param correlationData: 发送消息时的附加信息, 通常⽤于在确认回调中识别特定的消
息
* @param ack: 交换机是否收到消息, 收到为true, 未收到为false
* @param cause: 当消息确认失败时,这个字符串参数将提供失败的原因.这个原因可以⽤于调
试和错误处理.
* 成功时, cause为null
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack,
@Nullable String cause);
}
RabbitTemplate.ConfirmCallback 和 ConfirmListener 的区别:
- RabbitTemplate.ConfirmCallback:这是Spring AMQP库提供的一个回调接口,主要用于在使用RabbitTemplate发送消息时,接收来自RabbitMQ服务器的确认信息。这些确认信息表明消息是否已成功发送到RabbitMQ的交换机(Exchange)。
- ConfirmListener:这个接口或功能更多是直接与RabbitMQ的Channel相关,而不是直接通过Spring AMQP的RabbitTemplate来使用的。它用于监听RabbitMQ Channel上的消息确认事件,包括消息的ACK(确认)和NACK(不确认)。这种方式通常需要更底层的操作,直接处理RabbitMQ的Channel和连接。
return 退回模式
当消息成功到达 Exchange 交换机的时候,交换机会根据路由规则匹配对应的队列,将消息路由到指定的队列,在消息从 Exchange 到 Queue 的过程中,如果一条消息无法被任何队列消费(即没有队列与消息的 Routing Key 匹配或者队列不存在等),可以选择把消息退回给发送者,消息退回给发送者时,我们可以设置一个返回回调方法,对消息进行处理。
那么使用 SpringBoot 如何实现 return 退回模式呢?
首先还是需要进行配置,配置和上面的 confirm 模式是一样的:
spring:
rabbitmq:
publisher-returns: true #设置回退
设置返回回调逻辑并发送消息:
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.printf("消息接收成功,id:%s\n",correlationData.getId());
}else {
System.out.printf("消息接受失败,id:%s,cause:%s",correlationData.getId(),cause);
}
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.printf("消息被退回:%s",returnedMessage);
}
});
return rabbitTemplate;
}
setConfirmCallback() 和 setReturnCallback() 方法可以同时存在也可以单独设置。
rabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE,"confirm1","rabbitmq confirm",correlationData);
发送消息的时候,我们的 Routing Key 设置为没有 Binding Key 与之匹配的,然后来看看这个 returnCallback 是否会被执行:
消息被退回:ReturnedMessage [message=(Body:‘rabbitmq confirm’ MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=confirm.exchange, routingKey=confirm1]
消息接收成功,id:1
消息成功到达了 Exhcange,但是没有到达指定的队列,所以执行了 returnCallback 方法。
public class ReturnedMessage {
//返回的消息对象,包含了消息体和消息属性
private final Message message;
//由Broker提供的回复码, 表⽰消息⽆法路由的原因. 通常是⼀个数字代码,每个数字代表不同的含义.
private final int replyCode;
//⼀个⽂本字符串, 提供了⽆法路由消息的额外信息或错误描述.
private final String replyText;
//消息被发送到的交换机名称
private final String exchange;
//消息的路由键,即发送消息时指定的键
private final String routingKey;
}
常见面试题
如何保证 RabbitMQ 消息的可靠传输:
从这个图中可以看出,消息可能丢失的场景以及解决方案:
-
生产者将消息发送到RabbitMQ失败
a. 可能原因: 网络问题等
b. 解决办法: 参考本章节[发送方确认-confirm确认模式] -
消息在交换机中无法路由到指定队列:
a. 可能原因: 代码或者配置层面错误,导致消息路由失败
b. 解决办法: 参考本章节[发送方确认-return模式] -
消息队列自身数据丢失
a. 可能原因: 消息到达RabbitMQ之后,RabbitMQ Server宕机导致消息丢失。
b. 解决办法: 参考本章节[持久性]。开启RabbitMQ持久化,就是消息写入之后会持久化到磁盘,如果RabbitMQ挂了,恢复之后会自动读取之前存储的数据。(极端情况下,RabbitMQ还未持久化就挂了,可能导致少量数据丢失,这个概率极低,也可以通过集群的方式提高可靠性) -
消费者异常,导致消息丢失
a. 可能原因: 消息到达消费者,还没来得及消费,消费者宕机。消费者逻辑有问题。
b. 解决办法: 参考本章节[消息确认]。RabbitMQ提供了消费者应答机制来使RabbitMQ能够感知到消费者是否消费成功消息。默认情况下消费者应答机制是自动应答的,可以开启手动确认,当消费者确认消费成功后才会删除消息,从而避免消息丢失。除此之外,也可以配置重试机制(参考下一章节),当消息消费异常时,通过消息重试确保消息的可靠性。
原文地址:https://blog.csdn.net/m0_73888323/article/details/141161090
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!