自学内容网 自学内容网

RabbitMQ消息接收类Receiver

Receiver 类会使用 @RabbitListener 注解,来表示该类是一个消息接收器,用于处理来自 RabbitMQ 队列的消息

@Component
public class Receiver {
    @RabbitListener(queues = RabbitMQConstant.QUEUE_HELLO)
    public void hello(String hello) {
        System.out.println ("Receiver(hello) : "  + hello);
    }
 
    @RabbitListener(queues = RabbitMQConstant.QUEUE_HI)
    public void hi(String payload) {
        System.out.println ("Receiver(hi) : "  + payload);
        throw new RuntimeException("手动造的异常");
    }
类的结构

注解 @Component:
表示这个类是一个 Spring 组件,Spring 将自动识别并注册这个类,允许其作为 Spring 上下文中的一个 Bean。

注解 @RabbitListener:
用于标记一个方法为 RabbitMQ 消息监听器。当指定的队列收到消息时,RabbitMQ 会调用这个方法来处理消息。

方法分析

1. hello 方法

@RabbitListener(queues = RabbitMQConstant.QUEUE_HELLO)
public void hello(String hello) {
    System.out.println("Receiver(hello) : " + hello);
}

功能:这个方法用于处理来自 QUEUE_HELLO 队列的消息。
参数:接收一个 String 类型的消息(在这里命名为 hello)。
处理逻辑:打印接收到的消息内容。
监听机制:当 QUEUE_HELLO 队列收到消息时,RabbitMQ 会自动调用这个方法。

2. hi 方法

@RabbitListener(queues = RabbitMQConstant.QUEUE_HI)
public void hi(String payload) {
    System.out.println("Receiver(hi) : " + payload);
    throw new RuntimeException("手动造的异常");
}

功能:这个方法用于处理来自 QUEUE_HI 队列的消息。
参数:接收一个 String 类型的消息(在这里命名为 payload)。
处理逻辑:打印接收到的消息内容,然后手动抛出一个 RuntimeException 异常。
监听机制:当 QUEUE_HI 队列收到消息时,RabbitMQ 会自动调用这个方法。

异常处理

手动抛出异常:在 hi 方法中,咱们手动抛出了一个 RuntimeException。这会导致消息处理失败。
RabbitMQ 默认行为:当 @RabbitListener 方法抛出异常时,RabbitMQ 的默认行为是:
消息将被标记为“未确认”(unacknowledged),即 RabbitMQ 不会认为该消息已成功处理。
根据 RabbitMQ 的配置,可能会对消息进行重试或者将其发送到死信队列(Dead Letter Queue, DLQ)。

处理异常的最佳实践
重试机制:您可以使用 Spring 的重试机制来配置失败的重试次数和间隔。
自定义异常处理:可以通过实现 ErrorHandler 接口或使用 @RabbitListener 的 errorHandler 属性来定义自定义的错误处理策略。
死信队列:将无法成功处理的消息发送到死信队列,以便后续分析和处理。

示例改进

@RabbitListener(queues = RabbitMQConstant.QUEUE_HI, 
               ackMode = "MANUAL", 
               errorHandler = "myErrorHandler")
public void hi(String payload, Channel channel, Message message) {
    try {
        System.out.println("Receiver(hi) : " + payload);
        // 业务逻辑处理
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理异常逻辑
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

1. @RabbitListener 注解
功能:略
属性解释:
queues = RabbitMQConstant.QUEUE_HI:指定该监听器要监听的队列,RabbitMQConstant.QUEUE_HI 是一个常量,表示消息将从这个队列中获取。

ackMode = “MANUAL”:设置消息确认模式为手动确认。这意味着消息的确认(acknowledgment)将由代码中显式调用,而不是自动确认。

errorHandler = “myErrorHandler”:指定一个自定义的错误处理器,用于处理在消息处理过程中发生的异常。

2. 方法参数
String payload:接收的消息内容,类型为 String。
Channel channel:RabbitMQ 的通道对象,允许您手动确认或拒绝消息。
Message message:RabbitMQ 的消息对象,包含了消息的元数据和其他信息。

3. 方法逻辑
try 块:

System.out.println("Receiver(hi) : " + payload);
// 业务逻辑处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

sout:打印接收到的消息内容。
在这里您也可以替换/添加任何业务逻辑来处理接收到的消息。

channel.basicAck(…):手动确认消息的处理。这个调用告诉 RabbitMQ 消息已经成功处理,可以从队列中移除。getDeliveryTag() 方法获取消息的唯一标识符。

catch 块:

catch (Exception e) {
    // 处理异常逻辑
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}

如果在 try 块中发生异常,将执行此部分。
channel.basicNack(…):拒绝消息的处理。该调用告诉 RabbitMQ 该消息处理失败,参数说明如下:

第一个参数是消息的唯一标识符(delivery tag)。
第二个参数设置为 false,表示不批量处理。
第三个参数设置为 true,表示将消息重新放回队列中,以便其他消费者可以尝试处理。

这种模式在需要精细控制消息确认和错误处理时非常有用。


原文地址:https://blog.csdn.net/ll2695015910/article/details/142799656

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!