自学内容网 自学内容网

RabbitMQ学习-Ten

背景/需求:

消息从发送,到消费者接收,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失原因包括:

  • 发送时丢失:
  • 生产者发送的消息【未送达exchange】——返回nack(消息确认模式)
  • 消息【到达exchange】——返回ack(消息确认模式)
  • 到达queue后,MQ宕机,queue将消息丢失 ——返回ACK,及路由失败原因(回退模式)
  • consumer接收到消息后还未消费就宕机——消息持久化

 为了避免消息丢失,我们也可以使用RabbitMQ的事务机制:类似于MySQL的事务机制,开启事务、提交事务、事务回滚。但是我们一般不使用这种机制来保证可靠性,因为使用事务的话效率要降低几十甚至上百倍!!我们一般通过消息确认和Return机制来保证可靠性。

// 定义一个“订阅交换机”
channel.exchangeDeclare("ex3", BuiltinExchangeType.FANOUT);

// 定义一个“路由交换机”
channel.exchangeDeclare( "ex4", BuiltinExchangeType.DIRECT);

// 绑定队列
channel.queueBind( "queue7",  "ex4",  "k1");
channel.queueBind( "queue8",  "ex4", "k2");

// 发送消息到交换机
channel.txSelect(); // 开启事务
try {
    // 通过他的事务
    channel.basicPublish( "ex4",  "k1", basicProperties: null, msg.getBytes());
    System.out.println("发送:" + msg);
    channel.txCommit(); // 提交事务
} catch (Exception e) {
    channel.txRollback(); // 事务回滚
}

// 关闭通道和连接
channel.close();
connection.close();

Maven的消息确认机制:

为了保证消息从队列可靠的达到消费者,RabbitMQ提供了消息确认机制(Message Acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 参数等于 false 时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移除消息(实际上是先打上删除标记,之后在删除)。当 autoAck 参数等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。(简单来说就是确认消息提供者是否成功发送消息到交换机。)

  • 发送消息之前开启消息确认channel.confirmSelect()
  • 发送消息channel.basicPublish()
  • 接收消息确认:使用boolean b = channel.waitForConfirms()方法检查消息是否已被确认。
  • 这种方式再发送的所有消息中,如果有一条是失败的,则所有消息直接发送失败,抛出IO异常;

 如果我们批量发消息时间长会导致waitForConfirms进入阻塞状态我们不想让他进入阻塞状态可以通过监听器实现;我的理解是每次发消息都会被监听,然后这里打印信息

/使用监听器异步confirm
channel.addConfirmListener(new ConfirmListener(){
//参数1:longl 返回消息的表示
//参数2:boolean b 是否为批量confirm
    public void handleAck(long l, boolean b)throws IOException {
        System.out.println("~~~~~消息成功发送到交换机");
    }
    public void handleNack(long l, boolean b)throws IOException {
        System.out.println("~~~~~消息发送到交换机失败");
    }

需要注意的是,如果我们使用异步监听器的形式,就不要关闭资源了因为有可能先关闭了我们还没有发完消息呢!

我们的消息确认机制只能监听生产者是否将消息发送到交换机,而监听不到交换机到队列的过程,此时就需要引入另一个机制,叫做Return机制;

Maven的Return机制的实现

生产者需要在发送消息之前,为连接或通道添加Return监听器。这个监听器会监听所有无法被路由的消息,并调用相应的回调方法。

如果交换机分发消息到队列失败,则会执行此方法(用来处理交换机分发消息到队列失败的情况)

channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        // 处理无法被路由的消息
        System.out.println("消息无法被路由到队列,replyCode: " + replyCode + ", replyText: " + replyText);
        System.out.println("交换机: " + exchange + ", 路由键: " + routingKey);
        System.out.println("消息内容: " + new String(body));
    }
});

//channel.basicPublish("exchangeName", "routingKey", properties, "Hello HuangDaoJun!".getBytes());

channel.basicPublish("exchangeName", "routingKey", true, properties, "Hello HuangDaoJun!".getBytes());
需要注意的是这里的第三个参数,如果是空就只开启了消息确认机制;如果是true表示开启返回机制


原文地址:https://blog.csdn.net/qq_58738794/article/details/144229104

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!