RabbitMQ 高级特性
目录
Spring 的 RabbitTemplate 发送持久化消息
2.4 将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?
RabbitTemplate.ConfirmCallback (Spring)和 ConfirmListener (JDK)区别
rabbitTemplate.setMandatory(true);
1.消息确认
1.1 消息确认机制
⽣产者发送消息之后,到达消费端之后,可能会有以下情况:
a. 消息处理成功
b. 消息处理异常
RabbitMQ 向消费者发送消息之后,就会把这条消息删掉,那么第两种情况,就会造成消息丢失.
那么如何确保消费端已经成功接收了,并正确处理了呢?
为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,根据这个参数设置,消息确认机制分为以下两种:
• ⾃动确认: 当autoAck 等于true时,RabbitMQ 会⾃动把发送出去的消息置为确认,然后从内存(或 者磁盘)中删除,⽽不管消费者是否真正地消费到了这些消息.⾃动确认模式适合对于消息可靠性要求不⾼的场景.
• ⼿动确认:当 autoAck 等于 false 时,RabbitMQ 会等待消费者显式地调⽤ Basic.Ack 命令,回复确认信号后才从内存(或者磁盘)中移去消息.这种模式适合对消息可靠性要求⽐较⾼的场景.
消费信息的方法:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
代码示例:
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE_NAME1, true, consumer);
当 autoAck 参数置为 false,对于 RabbitMQ 服务端⽽⾔,队列中的消息分成了两个部分:
⼀. 是等待投递给消费者的消息.
⼆. 是已经投递给消费者,但是还没有收到消费者确认信号的消息. 如果 RabbitMQ ⼀直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则 RabbitMQ 会安排该消息重新进⼊队列,等待投递给下⼀个消费者,当然也有可能还是原来的那个消费者.
从 RabbitMQ 的 Web 管理平台上,也可以看到当前队列中 Ready(要发送给消费者的信息) 状态和 Unacked (发送给消费者还没有得到确认的信息)状态的消息数
Ready:等待投递给消费者的消息数
Unacked:已经投递给消费者,但是未收到消费者确认信号的消息数
1.2 手动确认方法
消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ 也提供了不同的确认应答的⽅式,消费者客户端可以调⽤与其对应的 channel 的相关⽅法,共有以下三种
1. 2.1肯定确认
Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMQ 已知道该消息并且成功的处理消息.可以将其丢弃了.
参数说明:
1) deliveryTag: 消息的唯⼀标识,它是⼀个单调递增的 64 位的⻓整型值.deliveryTag 是每个通道 (Channel)独⽴维护的,所以在每个通道上都是唯⼀的.当消费者确认(ack)⼀条消息时,必须使⽤对应的通道上进⾏确认.
2)multiple:是否批量确认.在某些情况下,为了减少⽹络流量,可以对⼀系列连续的 deliveryTag 进 ⾏批量确认.值为 true 则会⼀次性 ack 所有⼩于或等于指定 deliveryTag 的消息.值为false,则只确认当前指定 deliveryTag 的消息.
deliveryTag 是 RabbitMQ 中消息确认机制的⼀个重要组成部分,它确保了消息传递的可靠性和顺序性。
1.2.2 否定确认
Channel.basicReject(long deliveryTag, boolean requeue) RabbitMQ 在 2.0.0 版本开始引⼊了 Basic.Reject 这个命令,消费者客户端可以调⽤ channel.basicReject ⽅法来告诉 RabbitMQ 拒绝这个消息. 参数说明:
1) deliveryTag: 参考channel.basicAck
2) requeue:表⽰拒绝后,这条消息如何处理. 如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存⼊队列,以便可以发送给下⼀个订阅的消费者.如果 requeue 参数设置为 false,则RabbitMQ 会把消息从队列中移除,⽽不会把它发送给新的消费者.
3. 否定确认: Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue) Basic.Reject 命令⼀次只能拒绝⼀条消息,如果想要批量拒绝消息,则可以使⽤ Basic.Nack 这个命令.消费者客户端可以调⽤ channel.basicNack ⽅法来实现.
参数介绍参考上⾯两个⽅法.multiple 参数设置为 true 则表⽰拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息
1.3 SpringBoot 代码示例
基于 SpringBoot 来演⽰消息的确认机制,使⽤⽅式和使⽤ RabbitMQ Java Client 库有⼀定差异.
Spring-AMQP 对消息确认机制提供了三种策略.
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
1. AcknowledgeMode.NONE
◦ 这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 就会⾃动确认 消息,从 RabbitMQ 队列中移除消息.如果消费者处理消息失败,消息可能会丢失.
2. AcknowledgeMode.AUTO(默认)
◦ 这种模式下,消费者在消息处理成功时会⾃动确认消息,但如果处理过程中抛出了异常,则不会确认消息.
3. AcknowledgeMode.MANUAL
◦ ⼿动确认模式下,消费者必须在成功处理消息后显式调⽤ basicAck ⽅法来确认消息.如果消 息未被确认, RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可⽤时重新投递该消息,这 种模式提⾼了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,⽽是可以被重新处理.
1.3.1 配置确认机制
spring:
rabbitmq:
host: 192.168.66.129
port: 5672
virtual-host: wuyulin
username: wuyulin
password: wuyulin
listener:
simple:
#acknowledge-mode: none
#acknowledge-mode: auto
acknowledge-mode: manual
1.3.2 配置队列,交换机,绑定关系
package com.yulin.rabbitmqadvancedfeatures.configuration;
import com.yulin.rabbitmqadvancedfeatures.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Created with IntelliJ IDEA.
* Description:
* User: wuyulin
* Date: 2025-01-01
* Time: 23:32
*/
@Configuration
public class RabbitMQConfiguration {
/**
* 声明 ackExchange 交换机
* */
@Bean("ackExchange")
public Exchange ackExchange(){
return ExchangeBuilder.topicExchange(RabbitMQConstant.ACK_EXCHANGE_NAME).durable(true).build()
;
}
//2. 队列
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(RabbitMQConstant.ACK_QUEUE).build();
}
//3. 队列和交换机绑定 Binding
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange exchange,
@Qualifier("ackQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("ack").noargs();
}
}
1.3.3 生产者(向 rabbitmq 发送消息)
package com.yulin.rabbitmqadvancedfeatures.controller;
import com.yulin.rabbitmqadvancedfeatures.constants.RabbitMQConstant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* Created with IntelliJ IDEA.
* Description:
* User: wuyulin
* Date: 2025-01-01
* Time: 23:38
*/
@RestController
@RequestMapping("/producer")
public class ProductController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack(){
rabbitTemplate.convertAndSend(RabbitMQConstant.ACK_EXCHANGE_NAME, "ack", "consumer ack test...");
return "发送成功!";
}
}
1.3.4 消费者(消费队列中的信息)
package com.yulin.rabbitmqadvancedfeatures.listener;
import com.rabbitmq.client.Channel;
import com.yulin.rabbitmqadvancedfeatures.constants.RabbitMQConstant;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Created with IntelliJ IDEA.
* Description:
* User: wuyulin
* Date: 2025-01-01
* Time: 23:40
*/
@Component
public class AckQueueListener {
//指定监听队列的名称
@RabbitListener(queues = RabbitMQConstant.ACK_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到消息: %s, deliveryTag: %d/n",
new String(message.getBody(),"UTF-8"), deliveryTag);
//模拟处理失败
int num = 3/0;
//⼿动签收
//deliveryTag:这是一个长整型(long)参数,代表了需要确认的消息的唯一标识符。当消息从队列中被获取(通常是通过 basicConsume 方法)时,每条消息都会被分配一个 deliveryTag。
//true:这个布尔值参数表示消息是否只应被当前消费者确认。如果是 true,则消息只会被当前消费者确认,不会被其他消费者再次确认。如果是 false,则消息可能会被其他消费者确认,这在多消费者环境中有时是有用的。
channel.basicAck(deliveryTag, true);
System.out.println("处理完成");
}catch (Exception e){
//异常了就拒绝签收
//第三个参数requeue, 是否重新发送, 如果为true, 则会重新发送,,若为false, 则直接丢弃
channel.basicNack(deliveryTag, true,true);
System.out.println("处理异常");
}
}
}
2.持久性
我们如何保证当 RabbitMQ 服务停掉以后,⽣产者发送的消息不丢失呢.默认情况下,RabbitMQ 退出或者由于某种原因崩溃时,会忽视队列和消息,除⾮告知他不要这么做.RabbitMQ 的持久化分为三个部分:
交换器的持久化
队列的持久化
消息的持久化.
2.1 交换机持久化
交换器的持久化是通过在声明交换机时是将 durable 参数置为 true 实现的.相当于将交换机的属性在服务器内部保存,当 MQ 的服务器发⽣意外或关闭之后,重启 RabbitMQ 时不需要重新去建⽴交换机,交换机会⾃动建⽴,相当于⼀直存在.如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换机元数据会丢失,对⼀个⻓期使⽤的交换器来说,建议将其置为持久化的.
ExchangeBuilder.topicExchange(Constant.ACK_EXCHANGE_NAME).durable(true).build()
2.2 队列持久化
队列的持久化是通过在声明队列时将 durable 参数置为 true实现的. 如果队列不设置持久化,那么在 RabbitMQ 服务重启之后,该队列就会被删掉,此时数据也会丢失.(队列没有了,消息也⽆处可存了) 队列的持久化能保证该队列本⾝的元数据不会因异常情况⽽丢失,但是并不能保证内部所存储的消息不会丢失.要确保消息不会丢失,需要将消息设置为持久化.我前⾯博客⽤的创建队列的⽅式都是持久化的
QueueBuilder.durable(Constant.ACK_QUEUE).build();
点进去看源码会发现,该⽅法默认 durable 是 true
public static QueueBuilder durable(String name) {
return (new QueueBuilder(name)).setDurable();
}
private QueueBuilder setDurable() {
this.durable = true;
return this;
}
通过下⾯代码,可以创建非持久化的队列
QueueBuilder.nonDurable(Constant.ACK_QUEUE).build();
2.3 消息持久化
消息实现持久化,需要把消息的投递模式( MessageProperties 中的 deliveryMode )设置为2, 也就是 MessageDeliveryMode.PERSISTENT
public enum MessageDeliveryMode {
NON_PERSISTENT,//⾮持久化
PERSISTENT;//持久化
}
设置了队列和消息的持久化,当 RabbitMQ 服务重启之后,消息依旧存在.
如果只设置队列持久化,重启之后消息会丢失.
如果只设置消息的持久化,重启之后队列消失,继⽽消息也丢失.所以单单设置消息 持久化⽽不设置队列的持久化显得毫⽆意义.
Java 原生API 发送非持久化和持久化消息
//⾮持久化信息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
//持久化信息
channel.basicPublish("",QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
Spring 的 RabbitTemplate 发送持久化消息
// 要发送的消息内容
String message = "This is a persistent message";
// 创建⼀个Message对象,设置为持久化
Message messageObject = new Message(message.getBytes(), new
MessageProperties());
messageObject.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 使⽤RabbitTemplate发送消息
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE_NAME, "ack",
messageObject);
RabbitMQ 默认情况下会将消息视为持久化的,除非队列被声明为非持久化,或者消息在发送时被标记为非持久化
注意:将所有的消息都设置为持久化,会严重影响 RabbitMQ 的性能(随机).、写⼊磁盘的速度⽐写⼊内存的速度慢得不只⼀点点.对于可靠性不是那么⾼的消息可以不采⽤持久化处理以提⾼整体的吞吐量.在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做⼀个权衡.
2.4 将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?
将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?
答案是否定的.
1. 从消费者来说,如果在订阅消费队列时将 autoAck 参数设置为 true,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失.这种情况很好解决,将 autoAck 参数设置为 false,并进⾏⼿动确认.
2. 在持久化的消息正确存⼊RabbitMQ之后,还需要有⼀段时间(虽然很短,但是不可忽视)才能存⼊磁盘 中.RabbitMQ 并不会为每条消息都进⾏同步存盘(调⽤内核的 fsync ⽅法)的处理,可能仅仅保存到操作系统缓存之中⽽不是物理磁盘之中.如果在这段时间内 RabbitMQ 服务节点发⽣了宕机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失.
这个问题怎么解决呢 ?
1. 引⼊RabbitMQ 的仲裁队列(后⾯再讲),如果主节点(master)在此特殊时间内挂掉,可以⾃动切换到从节点(slave),这样有效地保证了⾼可⽤性,除⾮整个集群都挂掉(此⽅法也不能保证100%可靠,但是配置了仲裁队列要⽐没有配置仲裁队列的可靠性要⾼很多,实际⽣产环境中的关键业务队列⼀般都会设置仲裁队列).
2. 还可以在发送端引⼊事务机制或者发送⽅确认机制来保证消息已经正确地发送并存储⾄RabbitMQ 中,详细参考下⼀个章节内容介绍--"发送⽅确认"
3. 发送方确认
在使⽤ RabbitMQ 的时候,可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失,但是还有⼀个问题,当消息的⽣产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?
如果在消息到达服务器之前已经丢失(比如 RabbitMQ 重启,那么 RabbitMQ 重启期间⽣产者消息投递失败),持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化? RabbitMQ 为我们提供了两种解决⽅案:
a. 通过事务机制实现
b. 通过发送⽅确认(publisher confirm)机制实现
事务机制⽐较消耗性能,在实际⼯作中使⽤也不多,咱们主要介绍 confirm 机制来实现发送⽅的确认.RabbitMQ 为我们提供了两个⽅式来控制消息的可靠性投递。
1. confirm 确认模式
2. return 退回模式
3.1 confirm 确认模式
Producer 在发送消息的时候,对发送端设置⼀个 ConfirmCallback 的监听,⽆论消息是否到达 Exchange,这个监听都会被执⾏,如果 Exchange 成功收到,ACK( Acknowledge character ,确认 字符)为 true,如果没收到消息,ACK 就为 false.
步骤如下:
1. 配置RabbitMQ
2. 设置确认回调逻辑并发送消息
接下来看实现步骤
3.1.1 配置 RabbitMQ
# 应用服务 WEB 访问端口
server:
port: 8081
spring:
rabbitmq:
host: 192.168.66.129
port: 5672
virtual-host: wuyulin
username: wuyulin
password: wuyulin
#配置消息确认机制
listener:
simple:
# 这种模式下,消息⼀旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ 就会⾃动确认 消息,从 RabbitMQ 队列中移除消息.如果消费者处理消息失败,消息可能会丢失.
#acknowledge-mode: none
# (默认)这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息.
#acknowledge-mode: auto
# ⼿动确认模式下,消费者必须在成功处理消息后显式调用 basicAck ⽅法来确认消息.如果消息未被确认, RabbitMQ 会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提⾼了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理.
acknowledge-mode: manual
# 配置发送方确认机制
publisher-confirm-type: correlated
3.1.2. 设置确认回调逻辑并发送消息
⽆论消息确认成功还是失败,都会调⽤ ConfirmCallback 的 confirm ⽅法.如果消息成功发送到MQ, ack 为 true.如果消息发送失败,ack 为 false,并且 cause 提供失败的原因.
注入一个新的 RabbitTemplate 对象,设置发送方确认,并确认回调逻辑
@Configuration
public class RabbitMQConfiguration {
//如果单独注入新的 confirmRabbitTemplate 对象,会将普通的 rabbitTemplate 也改掉,所以还需要注入一个普通的 rabbitTemplate 对象
//普通的 RabbitTemplate
@Bean("rabbitTemplate")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
//配置了发送方确认机制的 RabbitTemplate
@Bean("confirmRabbitTemplate")
public RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.printf("");
if (ack){
System.out.printf("消息接收成功, id:%s \n", correlationData.getId()
原文地址:https://blog.csdn.net/q322359/article/details/144872977
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!