RabbitMQ学习-Ten
背景/需求:
消息从发送,到消费者接收,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失:
- 生产者发送的消息【未送达exchange】——返回nack(消息确认模式)
- 消息【到达exchange】——返回ack(消息确认模式)
- 到达queue后,MQ宕机,queue将消息丢失 ——返回ACK,及路由失败原因(回退模式)
- consumer接收到消息后还未消费就宕机——消息持久化
为了避免消息丢失,我们也可以使用RabbitMQ的事务机制:类似于MySQL的事务机制,开启事务、提交事务、事务回滚。但是我们一般不使用这种机制来保证可靠性,因为使用事务的话效率要降低几十甚至上百倍!!我们一般通过消息确认和Return机制来保证可靠性。
// 定义一个“订阅交换机”
channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);
// 定义一个“路由交换机”
channel.exchangeDeclare( "ex4", BuiltinExchangeType.DIRECT);
// 绑定队列
channel.queueBind( "queue7", "ex4", "k1");
channel.queueBind( "queue8", "ex4", "k2");
// 发送消息到交换机
channel.txSelect(); // 开启事务
try {
// 通过他的事务
channel.basicPublish( "ex4", "k1", basicProperties: null, msg.getBytes());
System.out.println("发送:" + msg);
channel.txCommit(); // 提交事务
} catch (Exception e) {
channel.txRollback(); // 事务回滚
}
// 关闭通道和连接
channel.close();
connection.close();
Maven的消息确认机制:
为了保证消息从队列可靠的达到消费者,RabbitMQ提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。(简单来说就是确认消息提供者是否成功发送消息到交换机。)
- 发送消息之前开启消息确认:
channel.confirmSelect()
。- 发送消息:
channel.basicPublish()
。- 接收消息确认:使用boolean b =
channel.waitForConfirms()
方法检查消息是否已被确认。- 这种方式再发送的所有消息中,如果有一条是失败的,则所有消息直接发送失败,抛出IO异常;
如果我们批量发消息时间长会导致waitForConfirms进入阻塞状态我们不想让他进入阻塞状态可以通过监听器实现;我的理解是每次发消息都会被监听,然后这里打印信息
/使用监听器异步confirm
channel.addConfirmListener(new ConfirmListener(){
//参数1:longl 返回消息的表示
//参数2:boolean b 是否为批量confirm
public void handleAck(long l, boolean b)throws IOException {
System.out.println("~~~~~消息成功发送到交换机");
}
public void handleNack(long l, boolean b)throws IOException {
System.out.println("~~~~~消息发送到交换机失败");
}
需要注意的是,如果我们使用异步监听器的形式,就不要关闭资源了因为有可能先关闭了我们还没有发完消息呢!
我们的消息确认机制只能监听生产者是否将消息发送到交换机,而监听不到交换机到队列的过程,此时就需要引入另一个机制,叫做Return机制;
Maven的Return机制的实现
生产者需要在发送消息之前,为连接或通道添加Return监听器。这个监听器会监听所有无法被路由的消息,并调用相应的回调方法。
如果交换机分发消息到队列失败,则会执行此方法(用来处理交换机分发消息到队列失败的情况)
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 处理无法被路由的消息
System.out.println("消息无法被路由到队列,replyCode: " + replyCode + ", replyText: " + replyText);
System.out.println("交换机: " + exchange + ", 路由键: " + routingKey);
System.out.println("消息内容: " + new String(body));
}
});
//channel.basicPublish("exchangeName", "routingKey", properties, "Hello HuangDaoJun!".getBytes());
channel.basicPublish("exchangeName", "routingKey", true, properties, "Hello HuangDaoJun!".getBytes());
需要注意的是这里的第三个参数,如果是空就只开启了消息确认机制;如果是true表示开启返回机制
原文地址:https://blog.csdn.net/qq_58738794/article/details/144229104
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!