RabbitMQ基础篇
文章目录
1 RabbitMQ概述
RabbitMQ简易安装教程
# 拉取镜像
docker pull rabbitmq:3.13-management
# -d 参数:后台运行 Docker 容器
# --name 参数:设置容器名称
# -p 参数:映射端口号,格式是“宿主机端口号:容器内端口号”。5672供客户端程序访问,15672供后台管理界面访问
# -v 参数:卷映射目录
# -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-v rabbitmq-plugin:/plugins \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3.13-management
docker exec -it 5129c41ad3d8 /bin/bash # 数字是rabbitmq的id,可以通过docker ps查看
rabbitmq-plugins enable rabbitmq_management #启用 RabbitMQ Management 插件,使得你可以轻松地监控和管理 RabbitMQ 服务器
访问登录:http://192.168.145.130:15672,账号密码就是上面指定的
1.1 消息队列
消息队列是实现应用程序之间通信的中间件
消息队列的好处
- 消息的发送者和接收者进行异步通信
- 流量高峰保证服务稳定,消息队列可以暂存大量消息,达到流量削峰
- 扩展性高,可以水平扩展以支持更多的发送者和接收者,相应地增加或减少资源处理(功能处理)
- 解耦:消息的发送者和接收者只专注于消息,无需关系彼此细节
主流MQ对比
1.2 RabbitMQ体系结构
-
Channel(信道)
:信道是生产者消费者和RabbitMQ服务器之间通信的桥梁。所有的消息发布和消费都由信道来完成的- 建立在TCP连接上的虚拟连接,允许在单个TCP连接上建立多个信道,从而实现多线程处理
- 每个线程对应一个信道,信道在RabbitMQ中具有唯一的ID,保证了信道的私有性
- 引入信道的概念是为了减少建立和销毁TCP连接的开销,提高系统性能
-
Exchange(交换机)
:负责接收消息并根据路由键将消息转发到绑定的队列 -
Queue(队列)
:队列是RabbitMQ
中用于存储消息的容器,消息按照先进先出的顺序进行处理 -
Virtual Host(虚拟主机)
:是RabbitMQ中的命名空间(理解为分组),用于隔离不同的环境或应用程序。每个虚拟主机都有自己的队列、交换机和绑定关系 -
Broker(代理服务器)
:指RabbitMQ服务器本身,多个Broker组合成一个RabbitMQ集群
2 RabbitMQ工作模式
- 简单模式:生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息
- 工作队列模式:生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)
- 发布/订阅模式:扇出交换机接收消息并将消息发送给所有订阅了该交换机的队列
- 消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)
- 消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列
项目导入依赖:采用原生的方式,开发中都是集成框架的
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies>
封装连接工具类:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static final String HOST_ADDRESS = "192.168.145.160";
public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(HOST_ADDRESS);
// 端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
2.1 简单模式(Simple Queue)
生产者向默认交换机发送消息,默认交换机将消息放到队列中,消费者监听并消费消息
生产者:发送消息
public class Producer {
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
// queue 参数1:队列名称
// durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
// exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
// autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
// arguments 参数5:队列其它参数
channel.queueDeclare("simple_queue", true, false, false, null);
// 要发送的信息
String message = "你好;小兔子!";
// 参数1:交换机名称,如果没有指定则使用默认Default Exchange
// 参数2:路由key,简单模式可以传递队列名称
// 参数3:配置信息
// 参数4:消息内容
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}
运行效果:新增队列:simple_queue
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
// 获取连接
Connection connection = ConnectionUtil.getConnection();
// 创建Channel
Channel channel = connection.createChannel();
// 创建队列
// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
// 参数1. queue:队列名称
// 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
// 参数3. exclusive:是否独占。
// 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
// 参数5. arguments:其它参数。
channel.queueDeclare("simple_queue",true,false,false,null);
// 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
// 参数1. consumerTag:标识
// 参数2. envelope:获取一些信息,交换机,路由key...
// 参数3. properties:配置信息
// 参数4. body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
// 参数1. queue:队列名称
// 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
// 参数3. callback:回调对象
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume("simple_queue",true,consumer);
}
}
控制台打印:
消息被消费掉了,所以RabbitMQ服务器上没有了
2.2 工作队列模式(Work Queues)
生产者向默认交换机发送消息,默认交换机将消息放到消息队列,多个消费者监听消息队列竞争消息(其实就是简单模式的升级版)
生产者
public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
for (int i = 1; i <= 10; i++) {
String body = i+"hello rabbitmq~~~";
channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
}
channel.close();
connection.close();
}
}
发送消息:
消费者1:
public class Consumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
消费者2:
public class Consumer2 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer2 body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
注意:运行的时候先启动两个消费端程序,然后再启动生产者端程序
运行结果:两个消费者竞争消息队列中消息
2.3 发布/订阅模式(Publish/Subscribe)
rabbitmq消息通讯过程:消息生产者将消息发送给交换机,由交换机处理消息。Exchange(交换机)只负责转发消息,不存储消息,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
常见的交换机类型
- Fanout Exchange(扇出交换机),将消息发送给所有绑定到交换机的队列
- Direct Exchange(直连交换机),把消息交给符合指定routing key的队列
- Topic Exchange(主题交换机),把消息交给符合routing pattern(路由模式)的队列
- Default Exchange(默认交换机),把消息发送给指定队列
发布/订阅模式:扇出交换机接收消息并将消息发送给所有订阅了该交换机的队列
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
// 1、获取连接
Connection connection = ConnectionUtil.getConnection();
// 2、创建频道
Channel channel = connection.createChannel();
// 参数1. exchange:交换机名称
// 参数2. type:交换机类型
// DIRECT("direct"):定向
// FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"):通配符的方式
// HEADERS("headers"):参数匹配
// 参数3. durable:是否持久化
// 参数4. autoDelete:自动删除
// 参数5. internal:内部使用。一般false
// 参数6. arguments:其它参数
String exchangeName = "test_fanout";
// 3、创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 4、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 5、绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout,routingKey设置为""
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 6、发送消息
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 7、释放资源
channel.close();
connection.close();
}
}
消费者1:
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
消费者2
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
先启动两个消费者,再启动生产者发送消息
交换机和队列的绑定关系如下图所示:
发布订阅模式与工作队列模式的区别:
- 工作队列模式消息由默认交换机处理,发布订阅模式消息由指定交换机处理
- 监听同一个队列的消费端程序彼此之间是竞争关系
- 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
2.4 路由模式(Routing)
消息生产者将消息发送给Direct交换机,Direct交换机根据routing key路由键将消息发送到指定队列(用的最多)
当Direct交换机用相同的路由键routing key绑定多个队列,就会有广播效果(类似发布订阅)
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name, exchangeName, "error");
// 队列2绑定info error warning
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");
String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
// 发送消息
channel.basicPublish(exchangeName, "warning", null, message.getBytes());
System.out.println(message);
// 释放资源
channel.close();
connection.close();
}
}
消费者1:
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
channel.queueDeclare(queue1Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
消费者2:
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue2Name,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
System.out.println("Consumer2 将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
先启动两个消费者,再启动生产者
绑定关系:
消费者2接受到消息,消费者1没有消息
2.5 通配符模式(Topics)
消息生产者将消息发送给Topic交换机,Topic交换机根据通配符形式的路由键将消息发送到指定队列
(通配符规则:
#
:匹配零个或多个词,*
:匹配一个词)
生产者:
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
// routing key 常用格式:系统的名称.日志的级别。
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");
// 分别发送消息到队列:order.info、goods.info、goods.error
String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
channel.basicPublish(exchangeName, "order.info", null, body.getBytes());
body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
channel.basicPublish(exchangeName, "goods.info", null, body.getBytes());
body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());
channel.close();
connection.close();
}
}
消费者1监听消息队列1
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue1";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
消费者2监听消息队列2
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue2";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
先启动两个消费者,接着启动生产者发送消息
3 RabbitMQ整合SpringBoot
项目基本四步骤基本步骤:建module,改POM,写YAML,主启动
3.1 @RabbitListener注解属性
-
bindings
属性:指定交换机和队列之间的绑定关系,指定当前方法要监听的队列隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们
-
queues
属性@RabbitListener(queues = {QUEUE_LINZHUOWEI})
- 作用:指定当前方法要监听的队列
- 此时框架不会创建相关交换机和队列,必须提前创建好
3.2 消费者工程
-
建module:
module06-boot-consumer
-
改POM
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--引入web模块为了保证项目一直运行,持久监听消息队列消息--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
-
写YAML
spring: rabbitmq: host: 192.168.145.130 port: 5672 username: guest password: 123456 virtual-host: / logging: level: com.linzhuowei.mq.listener.MyMessageListener: info
-
主启动:正常添加
@SpringBootApplication
-
监听器:
import lombok.extern.slf4j.Slf4j; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order"; @RabbitListener(bindings = @QueueBinding( //队列信息 value = @Queue(value = QUEUE_NAME, durable = "true"), //交换机信息 exchange = @Exchange(value = EXCHANGE_DIRECT), //路由键信息,赋值为字符串数组 key = {ROUTING_KEY} )) public void processMessage( //对应消息数据本身,形参类型需要和发送消息的数据类型对应 String data, //对应发送的消息本身,可以通过message获取消息数据,包括data Message message, //频道对象 Channel channel) { log.info(data); } }
运行查看后台管理:
如图:交换机exchange.direct.order
通过order
路由键绑定消息队列
题外话:
- 使用
@RabbitListener
的bindings
属性能绑定交换机和队列的关系并监听队列消息,如果RabbitMQ
服务中没有交换机和队列,则会自动创建该队列- 使用
@RabbitListener
的queues
属性,监听指定消息队列所以如果只是单纯监听消息队列,不考虑交换机和队列的创建以及绑定(因为这些创建操作可以在后台页面点击完成嘿嘿),消费者代码也可以这样写:
@RabbitListener(queues = {QUEUE_LINZHUOWEI}) public void processMessage( //对应消息数据本身,形参类型需要和发送消息的数据类型对应 String data, //对应发送的消息本身,可以通过message获取消息数据,包括data Message message, //频道对象 Channel channel) { log.info(data); }
但建议还是写第一种
3.3 生产者工程
-
新建模块:
module05-boot-producer
-
改POM
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
-
写YAML
spring: rabbitmq: host: 192.168.145.130 port: 5672 username: guest password: 123456 virtual-host: /
-
主启动
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMQProducerMainType { public static void main(String[] args) { SpringApplication.run(RabbitMQProducerMainType.class, args); } }
-
测试程序
@SpringBootTest public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello linzhuowei"); } }
4 消息可靠性投递
4.1 什么是消息可靠投递?
消息可靠投递是确保消息从生产者发送到消息队列,再从消息队列消费到消费者的过程中,不丢失消息或重复处理消息
消息可靠投递主要三个方面:
- 消息的可靠发送(生产者 -> 消息队列)
- 消息的可靠存储(消息队列内部存储)
- 消息的可靠消费(消息队列 -> 消费者)
下面分别说这三个部分
4.2 消息的可靠发送
通过消息发送回调接口或备用交换机保证消息从生产者成功发送到消息队列中
4.2.1 消息确认机制
应答确认+ 失败重试
生产者发送消息后等待消息队列的响应,确保消息成功送达,如果发送失败可以尝试重新发送
①模块准备
-
新建模块:
module07-confirm-producer
-
改POM
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
-
主启动
-
写YAML:启用消息确认机制
spring: rabbitmq: host: 192.168.145.130 port: 5672 username: guest password: 123456 virtual-host: / publisher-confirm-type: CORRELATED # 交换机的确认 publisher-returns: true # 队列的确认 logging: level: com.linzhuowei.mq.config.MQProducerAckConfig: info
②配置类说明
通过配置类设置RabbitTemplate的回调接口,通过回调方法获取RabbitMQ服务器返回的确认信息,实现消息确认机制
代码实现过程:配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中
两个接口两个回调方法,是否发送到交换机和是否发送到消息队列
方法名 | 方法功能 | 所属接口 | 接口所属类 |
---|---|---|---|
confirm() | 确认消息是否发送到交换机 | ConfirmCallback | RabbitTemplate |
returnedMessage() | 确认消息是否发送到队列 | ReturnsCallback | RabbitTemplate |
-
ConfirmCallback接口(RabbitTemplate内部的接口)
/** * A callback for publisher confirmations. * */ @FunctionalInterface public interface ConfirmCallback { /** * Confirmation callback. * @param correlationData correlation data for the callback. * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause); }
生产者端发送消息之后,回调
confirm()
方法- ack参数值为true:表示消息成功发送到了交换机
- ack参数值为false:表示消息没有发送到交换机
-
ReturnCallback接口(RabbitTemplate内部的接口)
/** * A callback for returned messages. * * @since 2.3 */ @FunctionalInterface public interface ReturnsCallback { /** * Returned message callback. * @param returned the returned message and metadata. */ void returnedMessage(ReturnedMessage returned); }
接口中的
returnedMessage()
方法仅在消息没有发送到队列时调用
ReturnedMessage
类中主要属性含义如下:
属性名 | 类型 | 含义 |
---|---|---|
message | org.springframework.amqp.core.Message | 消息以及消息相关数据 |
replyCode | int | 应答码,类似于HTTP响应状态码 |
replyText | String | 应答码说明 |
exchange | String | 交换机名称 |
routingKey | String | 路由键名称 |
③配置类示例
配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送到交换机成功!数据:" + correlationData);
} else {
log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息发送到消息队列失败...");
log.info("消息主体: " + new String(returned.getMessage().getBody()));
log.info("应答码: " + returned.getReplyCode());
log.info("描述:" + returned.getReplyText());
log.info("消息使用的交换器 exchange : " + returned.getExchange());
log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
}
}
④测试代码
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu");
}
}
通过调整代码,测试如下三种情况:
- 交换机正确、路由键正确
- 交换机正确、路由键不正确,无法发送到队列
- 交换机不正确,无法发送到交换机
回顾交换机作用:接收消息并路由到消息队列
4.2.2 备用交换机
①备用交换机配置
当消息在队列中未被处理时(如消息过期、消息被拒绝或达到最大重试次数,无匹配队列等),这些消息就会转发到备用交换机
本次案例模拟交换机没有匹配的消息队列,消息转至备用交换机
-
首先创建备用交换机(扇出类型):
exchange.direct.order.backup
-
创建备用消息队列
queue.order.backup
-
将备用消息队列绑定备用交换机
\
-
重新创建原交换机(置顶备用交换机)
exchange.direct.order
需要删除原来的直连交换机,重新创建直连交换机,并设置备用交换机
exchange.direct.order.backup
-
原交换机绑定原队列
②备用交换机测试
消息发送端:
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
//exchange.direct.order交换机绑定的路由键为order,这里order路由键错误,会转到备用交换机
public static final String ROUTING_KEY = "order1";
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY+"11",
"Hello 备用交换机");
}
}
结果:消息发送成功,消息先发往直连交换机exchange.direct.order
,由于路由键无效,没有匹配的消息队列,所以消息发往备用的扇出交换机exchange.direct.order.backup
,最终发送到消息队列queue.test.backup
4.3 消息的可靠存储
通过将消息持久化到硬盘上防止消息队列宕机导致内存中消息丢失(交换机默认持久化,消息队列有指定也是默认持久化)
4.3.1 非持久化交换机和队列
即消息在内存存储,重启消息丢失
-
创建非持久化交换机
-
创建非持久化消息队列
-
绑定交换机和消息队列的关系
测试:发送消息后,队列成功收到消息。
docker restart rabbitmq
重启rabbitmq,内存的消息丢失,内存掉电设备
4.3.2 持久化交换机和消息队列
先来看卡监听消息队列的写法
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT),
key = {ROUTING_KEY}
))
public void processMessage(String dateString,
Message message,
Channel channel) {
log.info(dateString);
}
关注@RabbitListener
中,@QueueBinding
中的value和exchange两个注解,分别是Queue
和Exchange
类型
①@Queue注解分析
@Queue
注解抽出关注的部分
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {
/**
* Specifies if this queue should be durable.
* By default if queue name is provided it is durable.
* @return true if the queue is to be declared as durable.
* @see org.springframework.amqp.core.Queue#isDurable()
*/
String durable() default "";
/**
* Specifies if this queue should be auto deleted when not used.
* By default if queue name is provided it is not auto-deleted.
* @return true if the queue is to be declared as auto-delete.
* @see org.springframework.amqp.core.Queue#isAutoDelete()
*/
String autoDelete() default "";
}
durable
属性:By default if queue name is provided it is durable
autoDelete
属性:By default if queue name is provided it is not auto-deleted
翻译就是:只要消息队列指定,默认持久化且不自动删除
②@Exchange注解分析
@Exchange
注解抽出有用的部分
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {
/**
* @return false if the exchange is to be declared as non-durable.
*/
String durable() default TRUE;
/**
* @return true if the exchange is to be declared as auto-delete.
*/
String autoDelete() default FALSE;
}
durable
属性默认true:false if the exchange is to be declared as non-durable
autoDelete
属性默认false:true if the exchange is to be declared as auto-delete
交换机默认持久化
4.4 消息的可靠消费
消息确认机制
- 自动确认:消费者接收消息后自动返回ACK确认,RabbitMQ删除消息。自动确认机制,消息处理失败会导致消息丢失(因为消息已删)
- 手动确认:消费者处理消息成功后,显式发送ACK给消息队列,通知RabbitMQ消息成功消费删除消息,消费者处理消息失败后,显示发送NACK给消息队列,通知RabbitMQ消息消费失败,执行相应的失败策略。手动确认机制保证消息的可靠消费
4.4.1 模块准备
-
POM
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
-
YAML:开启手动确认机制
spring: rabbitmq: host: 192.168.145.130 port: 5672 username: guest password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual # 把消息确认模式改为手动确认
-
主启动
-
消息监听:其实
durable
和autoDelete
可以不设置,默认值就是这样的@Component public class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order"; public static final String ROUTING_KEY = "order"; public static final String QUEUE_NAME = "queue.order"; // 修饰监听方法 @RabbitListener( // 设置绑定关系 bindings = @QueueBinding( // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"), // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"), // 配置路由键信息 key = {ROUTING_KEY} )) public void processMessage(String dataString, Message message, Channel channel) { } }
4.4.2 手动确认思路
- 步骤1:YAML配置文件把消息确认模式改为手动确认
- 步骤2:调用Channel对象的方法返回信息
- ACK:Acknowledgement,表示消息处理成功
- NACK:Negative Acknowledgement,表示消息处理失败
- Reject:拒绝,同样表示消息处理失败
- 步骤3:拒绝或者消息处理失败的后续操作
- requeue为true:重新放回队列,重新投递,再次尝试
- requeue为false:不放回队列,不重新投递
①basicAck()方法
- 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
②basicNack()方法
- 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean multiple | 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 |
boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
③basicReject()方法
- 方法功能:根据指定的deliveryTag,对该消息表示拒绝
- 参数列表:
参数名称 | 含义 |
---|---|
long deliveryTag | Broker给每一条进入队列的消息都设定一个唯一标识 |
boolean requeue | 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列 |
basicNack()和basicReject()有啥区别?
- basicNack()有批量操作
- basicReject()没有批量操作
Fanout交换机,同一个消息广播到不同的队列,deliveryTag会重复吗?不会,deliveryTag在Broker范围内唯一
4.4.3 可靠消费代码
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@Slf4j
public class MyMessageListener {
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";
// 修饰监听方法
@RabbitListener(
// 设置绑定关系
bindings = @QueueBinding(
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息
key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
// 1、获取当前消息的 deliveryTag 值备用
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 2、正常业务操作
log.info("消费端接收到消息内容:" + dataString);
// System.out.println(10 / 0);
// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 4、获取信息,看当前消息是否曾经被投递过
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (!redelivered) {
// 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次
channel.basicNack(deliveryTag, false, true);
} else {
// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列
channel.basicReject(deliveryTag, false);
}
}
}
}
4.5 消息可靠性投递架构
MQ是系统解耦利器,能很好解除消息发送者和消息接收者之间的耦合。如何保证消息可靠性投递?通过前面我们知道主要分消息可靠发送,消息可靠存储,消息可靠消费。这一小节我们用另一个角度分析
要保证消息可靠性投递,我们分上下两个半场
-
上半场123分别对应:1发送方调用主动API发送消息,2MQ服务端收到消息并将消息落库(持久化),3发送方收到回调ACK(确认消息成功投递到MQ服务器)
timer起作用:步骤 3,如果发送方没有收到回调确认(比如服务端由于网络问题或者其他原因未能正确发送 ACK),则发送方会启动一个定时器,尝试重新发送消息。如果多次发送失败(超时),发送方会向业务方回调发送失败,这通常是在重试机制达到最大次数或超时后触发的
-
下半场456分别对应:4消费端接收消息处理业务逻辑,5接收方(消费者)发送 ACK 回应消息处理成功,6MQ服务端收到ACK并将库中的消息删除
timer起作用:步骤 5,消费者没有及时发送 ACK(比如消费者处理超时或发生了异常),MQ 服务端会启动定时器等待 ACK
如果 MQ 服务端在规定时间内没有收到消费者的 ACK,
timer
会触发重试机制,可能重新将消息投递到消费者,只到确认消息被处理并收到 ACK 后,消息才会从 MQ 服务端的持久化存储中删除,以确保消息的可靠性
上下半场均有重发,重发策略有定时重发(如每个10s重发直到超出次数)和指数退避(X秒重发,2X秒重发,4X秒重发)
综合来看关键点在于如何保证消息幂等
- 上半场消息幂等:发送方没有收到回调ACK,会重新发送消息到MQ服务器。上半场的消息幂等性有MQ服务器完成,MQ会为每条消息生成全局唯一的message ID用作去重和幂等依据(上半场消息幂等由MQ服务器完成无需关注)
- 下半场消息幂等:MQ服务端超时未收到ACK,导致MQ重复投递消息。业务方会收到重复消息,业务方需要保证消息幂等性。比如消息携带全局唯一id用于保证幂等,再处理消息前判断即可
5 消费端限流
利用消息队列的削峰限流,平滑流量避免大量请求涌入,限制请求数量,避免对后端服务造成过大的压力
常见的削峰限流策略有:
通过prefetch来设置消费者**同时接收未确认的消息的数量**,每次预取的消息数量来实现流量削峰
5.1 未设置prefetch
首先向消息队列中发送100个消息
public void testSendMessage() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu" + i);
}
}
- Ready100
- Unack0
- Total100
消息消费者监听对应的消息队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT),
key = {ROUTING_KEY}
))
public void processMessage(String dataString,
Message message,
Channel channel) throws InterruptedException, IOException {
// 正常业务操作
log.info("消费端接收到消息内容:" + dataString);
//如果不睡1秒,瞬间为0
TimeUnit.SECONDS.sleep(1);
//手动确认ACK
channel.basicAck(deliveryTag, false);
}
显示结果:Ready直接为0,Unack和Total逐渐减少直到0
5.2 设置prefetch
修改YAML
spring:
rabbitmq:
host: 192.168.145.130
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual
prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息(同时接收未确认的消息的数量)
首先发送消息:
public void testSendMessage() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello atguigu" + i);
}
}
消息消费者监听对应的消息队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT),
key = {ROUTING_KEY}
))
public void processMessage(String dataString,
Message message,
Channel channel) throws InterruptedException, IOException {
// 正常业务操作
log.info("消费端接收到消息内容:" + dataString);
//如果不睡1秒,瞬间为0
TimeUnit.SECONDS.sleep(1);
//手动确认ACK
channel.basicAck(deliveryTag, false);
}
效果,监听者每次只取一个消息消费,同时未确认消息只有prefetch
个
6 消息超时
设置过期时间,消息超过过期时间自动删除(更准确的说超时消息会变成死信)
可通过两个层面设置过期时间
- 队列层面:设置队列的消息过期时间,队列内的消息超出过期时间自动删除
- 消息层面:设置具体某个消息的过期时间,消息超出过期时间自动删除
如果两个层面都有设置,以过期时间短的为准
6.1 队列层面设置过期时间
创建交换机
创建消息队列,并设置过期时间10000毫秒
绑定交换机
发送消息,不启动消费端,等待消息过期
6.2 消息层面设置过期时间
MessagePostProcessor 是 Spring Framework 的接口,在消息发送前对消息进行处理和修改。通过接口MessagePostProcessor接口在消息层面设置过期时间
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
@Test
public void testSendMessageTTL() {
// 1、创建消息后置处理器对象
MessagePostProcessor messagePostProcessor = (Message message) -> {
// 设定 TTL 时间,以毫秒为单位
message.getMessageProperties().setExpiration("5000");
return message;
};
// 2、发送消息
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello linzhuowei", messagePostProcessor);
}
原来消息队列queue.test.timeout
过期时间10000毫秒,消息层面设置过期时间5000毫秒,以短的过期时间为标准,发送消息,等待消息过期
7 死信和死信队列
无法正常被消费的消息就称为死信
死信的原因有三种(就是消息没有被正常消费):
- 拒绝:消费者拒绝消息,
basicNack()/basicReject()
,并且不把消息重新放回原目标队列(requeue=false
) - 超时:消息达到超时时间未被消费
- 溢出:队列中消息数量达到最大限制,根据队列先进先出原理,后来再进入一条消息,队列中最早的消息会变成死信
死信的处理方式大致三种:
- 丢弃:不处理,死信直接丢弃
- 入库:死信写入数据库,日后处理
- 监听:死信进入死信队列,消费端监听死信队列,做后序处理(通常采用)
下面分别演示三种死信成因
7.1 准备工作
7.1.1 正常交换机和正常消息队列
- 正常交换机:exchange.normal.video
- 正常队列:queue.normal.video
- 正常路由键:routing.key.normal.video
-
创建正常交换机
-
创建正常队列,写好死信队列和死信交换机
-
绑定正常消息队列和正常交换机
完成设施后设置如下
7.1.2 死信交换机和死信队列
- 死信交换机:exchange.dead.letter.video
- 死信队列:queue.dead.letter.video
- 死信路由键:routing.key.dead.letter.video
-
创建死信交换机
-
创建死信队列
-
绑定死信队列和死信交换机
7.1.3 常量声明
public static final String EXCHANGE_NORMAL = "exchange.normal.video";
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video";
public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video";
public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";
7.2 死信–拒绝
-
发送端发送消息到正常交换机
@Test public void testSendMessageButReject() { rabbitTemplate .convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "★[normal]发送消息--正常交换机--正常消息队列..."); }
-
消费端监听正常消息队列和死信队列
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @Slf4j public class DeadLetterListener { public static final String QUEUE_NORMAL = "queue.normal.video"; public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video"; @RabbitListener(queues = {QUEUE_NORMAL}) public void processMessageNormal(Message message, Channel channel) throws IOException { // 消费端监听正常消息队列,接收并拒绝消息 log.info("★[normal]接收消息,但拒绝消息且不重新放入队列..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } @RabbitListener(queues = {QUEUE_DEAD_LETTER}) public void processMessageDead(String dataString, Message message, Channel channel) throws IOException { // 消费端监听死信队列,接收并成功消费消息 log.info("★[dead letter]监听死信队列,接收到死信消息..."); log.info("★[dead letter]dataString = " + dataString); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
先启动消费端监听死信队列和正常队列,再向正常消息队列发送消息
过程:发送端将消息发送到正常消息队列,监听正常消息队列的消费者接收消息并拒绝,消息通过死信交换机路由到死信队列,监听死信队列的消费者接收并成功消费。
正常消息队列:queue.normal.video
,由于消息刚到达就被消费者接收,所以Queued messages
没有变化
同一时间,死信队列也是刚接收消息就被消费端消费,所以Queued messages
没有变化
消费端控制台打印:
★[normal]接收消息,但拒绝消息且不重新放入队列...
★[dead letter]监听死信队列,接收到死信消息...
★[dead letter]dataString = ★[normal]发送消息--正常交换机--正常消息队列...
7.3 死信–超时和溢出
前面创建正常消息队列时就置顶了正常消息队列最大消息数为10(
x-max-length=10
)且最大生存时间为10s(x-message-ttl=10000
)
先关闭消费者,向正常消息队列发送20条消息
@Test
public void testSendMessageButReject() {
for (int i = 0; i < 20; i++) {
rabbitTemplate
.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"★[normal]发送消息--正常交换机--正常消息队列...");
}
}
- 发送者发送20条消息(m1,m2,…,m19,m20)
- 前十条消息(m1,m2,…,m9,m10)正常进入消息队列,到达最大消息数
- 后十条消息(m11,m12,…,m19,m20)进入消息队列,根据队列先进先出,前十条消息(m1,m2,…,m9,m10)溢出
- 这十条消息(m1,m2,…,m9,m10)通过死信交换机进入死信队列(对应死信队列第一个上坡)
- 后十条消息(m11,m12,…,m19,m20)超过10s未被消费,超时,后十条消息(m11,m12,…,m19,m20)也进入死信队列(对应死信队列第二个上坡)
消费者端省略,就还是监听然后消费…
8 延时队列
延时队列有两种实现思路
- 借助超时时间+死信队列来实现延时队列
- 通过RabbitMQ插件来完成延时队列
插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
需要将插件放入rabbitmq中容器的?/plugins
目录,我们来看看该目录映射到宿主机的哪个目录?
docker inspect rabbitmq
运行结果:
"Mounts": [
{
"Type": "volume",
"Name": "rabbitmq-plugin",
"Source": "/var/lib/docker/volumes/rabbitmq-plugin/_data",
"Destination": "/plugins",
"Driver": "local",
"Mode": "z",
"RW": true,
"Propagation": ""
},
{
"Type": "volume",
"Name": "0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4",
"Source": "/var/lib/docker/volumes/0f14d68cf8aad70d33dfe59651cf0c05c0598d1e2223d0154cf6689a9dfb96f4/_data",
"Destination": "/var/lib/rabbitmq",
"Driver": "local",
"Mode": "",
"RW": true,
"Propagation": ""
}
],
和容器内/plugins
目录对应的宿主机目录是:/var/lib/docker/volumes/rabbitmq-plugin/_data
## 8.1 下载延迟插件
RabbitMQ社区插件:https://www.rabbitmq.com/community-plugins.html
延迟插件:
下载插件安装文件,并移动到对应目录
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
mv rabbitmq_delayed_message_exchange-3.13.0.ez /var/lib/docker/volumes/rabbitmq-plugin/_data
启用插件
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash
# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出Docker容器
exit
# 重启Docker容器
docker restart rabbitmq
延迟插件启动成功:
8.2 延迟插件的使用
8.2.1 生产者端
通过MessageProcessor来设置延迟时间
@Test
public void testSendDelayMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DELAY,
ROUTING_KEY_DELAY,
"测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",
messageProcessor -> {
// 设置延迟时间:以毫秒为单位
messageProcessor.getMessageProperties().setHeader("x-delay", "10000");
return messageProcessor;
});
}
8.2.2 消费者端
① ui界面创建延迟交换机和队列
使用插件创建交换机exchange.delay.happy
使用rabbitmq_delayed_message_exchange
插件要求交换机type=x-delayed-message
,并通过x-delayed-type
设置交换机的类型(direct、fanout、topic
),创建方式如下:
创建消息队列queue.delay.video
并绑定exchange.delay.happy
交换机
@Component
@Slf4j
public class MyDelayMessageListener {
public static final String QUEUE_DELAY = "queue.delay.video";
@RabbitListener(queues = {QUEUE_DELAY})
public void process(String dataString, Message message, Channel channel) throws IOException {
log.info("[生产者]" + dataString);
log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
②代码创建延迟交换机和队列
@Component
@Slf4j
public class MyDelayMessageListener {
public static final String EXCHANGE_DELAY = "exchange.delay.video";
public static final String ROUTING_KEY_DELAY = "routing.key.delay.video";
public static final String QUEUE_DELAY = "queue.delay.video";
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),
exchange = @Exchange(
value = EXCHANGE_DELAY,
durable = "true",
autoDelete = "false",
type = "x-delayed-message",
arguments = @Argument(name = "x-delayed-type", value = "direct")),
key = {ROUTING_KEY_DELAY}
))
public void process(String dataString, Message message, Channel channel) throws IOException {
log.info("[生产者]" + dataString);
log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
8.2.3 效果展示
前面消息可靠投递中说过,消息发送后回调
confirm()
,而returnMessage()
只有在消息发送失败才会回调,但是使用rabbitmq_delayed_message_exchange
插件后,即使消息成功发送到队列上,也会导致returnedMessage()
方法执行(问题不大嘛)
消费端效果:
[生产者]测试基于插件的延迟消息 [12:41:29]
[消费者]12:41:39
9 事务消息
9.1 什么是事务消息?
将生产者发送消息的操作打包成一个原子操作,要么全部成功要么全部失败,通过事务消息保证消息发送的原子性
RabbitMQ 的事务消息有点类似 Spring 的事务,分为开始事务、提交事务、回滚事务。
txSelect()
:开始事务,使用txSelect()
开启事务。txCommit()
:提交事务,如果txCommit()
提交事务成功了,则消息一定会发送到RabbitMQ
。txRollback()
:回滚事务,如果在执行txCommit()
之前RabbitMQ
发生了异常,txRollback()
会捕获异常进行回滚。
RabbitMQ
发送事务消息流程:txSelect
开启事务,消息发送到 RabbitMQ
缓存,接着 txCommit
提交事务,txCommit
成功后则消息一定发送到了 RabbitMQ。
如果在 txCommit
完成前出现任何异常,我们就捕获这个异常然后执行 txRollback
进行回滚操作,整个过程跟 Spring 的事务机制没太大的区别。因此,我们可以通过 RabbitMQ 事务机制保证消息一定可以发送成功。
了解了 RabbitMQ 的事务消息机制,接下来我们就分享两种方式来实现 RabbitMQ 事务消息
9.2 Springboot发送事务消息
9.2.1 准备工作
-
改pom.xml
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
-
写yaml
spring: rabbitmq: host: 192.168.200.100 port: 5672 username: guest password: 123456 virtual-host: /
-
主启动
-
事务配置
@Configuration @Data public class RabbitConfig { @Bean public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) { return new RabbitTransactionManager(connectionFactory); } @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; } }
9.2.2 没有事务消息
没有事务消息,无法保证消息发送原子性
@SpringBootTest
@Slf4j
public class RabbitMQTest {
public static final String EXCHANGE_NAME = "exchange.tx.dragon";
public static final String ROUTING_KEY = "routing.key.tx.dragon";
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessageInTx() {
// 1、发送第一条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
// 2、抛出异常
log.info("do bad:" + 10 / 0);
// 3、发送第二条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");
}
}
抛出异常前的消息发送了,抛异常后的消息没有发送:
9.2.3 使用事务消息
因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控
@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {
// 1、发送第一条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");
log.info("do bad:" + 10 / 0);
// 2、发送第二条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}
9.3 Channel发送事务消息
工具类
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RabbitMqUtil {
public static Channel getChannel() {
// 创建一个连接工厂,并设置MQ的相关信息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("xxxxxx");
factory.setUsername("xxx");
factory.setPassword("xxx");
factory.setVirtualHost("/xxx");
Channel channel = null;
try {
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
channel = connection.createChannel();
} catch (Exception e) {
log.error("创建 RabbitMQ Channel 失败", e);
e.printStackTrace();
}
return channel;
}
}
Channel发送事务消息
import com.rabbitmq.client.Channel;
import com.user.service.util.RabbitMqUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class RabbitTransactionChannelProducer {
@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
public void sendTransactionaChannelMessage(String message) {
//获取 Channel
Channel channel = RabbitMqUtil.getChannel();
try {
//开启事务
channel.txSelect();
//发送消息
channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "1").getBytes(StandardCharsets.UTF_8));
channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "2").getBytes(StandardCharsets.UTF_8));
//发送第三条消息之前模拟一个错误 我们看下前两条消息否回滚了
//int a = 1 / 0;
channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "3").getBytes(StandardCharsets.UTF_8));
//提交事务
channel.txCommit();
} catch (Exception e) {
//回滚事务
try {
channel.txRollback();
} catch (IOException ex) {
log.error("txRollback error", e);
ex.printStackTrace();
}
e.printStackTrace();
} finally {
try {
channel.close();
} catch (Exception e) {
log.error("channel close error", e);
e.printStackTrace();
}
}
}
}
10 惰性队列
创建队列分两种:
-
default默认消息队列:消息存储在内存,当队列内存限制触发才会将部分消息移到磁盘
-
lazy惰性消息队列:消息尽可能地保存在磁盘,内存中只保持必要的元数据
惰性队列,将消息尽可能地保存在磁盘,减少内存的使用。有效防止由于队列消息过多导致的内存溢出,是处理需要处理大量消息但内存有限的场景。但是由于消息存于磁盘,生产者发送消息和消费者消费比普通队列慢,尤其在高吞吐场景
队列创建置顶模式方式有:使用队列策略(建议)和设置queue.declare
参数
如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。
10.1 队列策略设定
# 登录Docker容器
docker exec -it rabbitmq /bin/bash
# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
-
rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量
-
set_policy是子命令,表示设置策略
-
Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的
-
"^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置
-
'{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"
-
–-apply-to参数指定该策略将应用于队列(queues)级别
-
命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列
如果需要修改队列模式可以执行如下命令(不必删除队列再重建):
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues
10.3 queue.declare
参数设定
参数
x-queue-mode
设定队列创建模式,lazy
和default
(默认)
Java代码原生API设置方式:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);
Java代码注解设置方式:
@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {
@Argument(name = "x-queue-mode", value = "lazy")
})
11 优先级队列
优先级队列允许你根据消息的优先级来处理消息。消息默认先进先出,通过设置不同的优先级值,消费者可以优先处理重要或紧急的消息,而延迟处理优先级较低的消息
11.1 准备工作
-
创建交换机:
exchange.test.priority
-
创建消息队列:
queue.test.priority
RabbitMQ消息优先级范围 1到255 ,建议使用 1到5(数字越大优先级越高)
通过设置
x-max-priority
来指定消息队列的最大优先级,默认为0。而消息的优先级不能大于x-max-priority
,所以使用优先级队列一定要指定x-max-priority
,这里指定为x-max-priority=10
-
改POM
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.1.5</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
-
YAML
spring: rabbitmq: host: 192.168.200.100 port: 5672 username: guest password: 123456 virtual-host: /
-
主启动
11.2 使用优先级队列
不要启动消费者程序,让多条不同优先级的消息滞留在队列中
- 第一次发送优先级为1的消息
- 第二次发送优先级为2的消息
- 第三次发送优先级为3的消息
先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息
消息生产者:
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
//第一次发送优先级为1的消息
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
message.getMessageProperties().setPriority(1);
return message;
});
//第二次发送优先级为2的消息
//rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{
// message.getMessageProperties().setPriority(2);
// return message;
//});
//第三次发送优先级为3的消息
//rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{
// message.getMessageProperties().setPriority(3);
// return message;
//});
}
}
消费端:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MyMessageProcessor {
public static final String QUEUE_PRIORITY = "queue.test.priority";
@RabbitListener(queues = {QUEUE_PRIORITY})
public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
log.info(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
效果:
I am a message with priority 3.
I am a message with priority 2.
I am a message with priority 1.
原文地址:https://blog.csdn.net/weixin_63267854/article/details/145212784
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!