自学内容网 自学内容网

【RabbitMq】RabbitMq高级特性-延迟消息

什么是延迟消息

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。
延迟任务:设置在一定时间之后才执行的任务
在这里插入图片描述
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

死信交换机有什么作用呢?

  • 收集那些因处理失败而被拒绝的消息
  • 收集那些因队列满了而被拒绝的消息
  • 收集因TTL(有效期)到期的消息

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信
    在这里插入图片描述

实现代码
发送者

    @Test
    public void SendNormalMessage() {
        String exchangeName = "normal.direct";
        String message = "Hello normal";
        this.rabbitTemplate.convertAndSend(exchangeName, "dlx", message, message1 -> {
            message1.getMessageProperties().setExpiration("2000");
            return message1;
        });
    }

消费者

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "dlx.queue"),
            exchange = @Exchange(name = "dlx.direct"),
            key = {"dlx"}
    ))
    public void istenerdlxQueue2(String msg){
        log.info("消费者.。。。。。。。监听到 dlx.queue 的消息【{}】",msg);
    }
@Configuration
public class SpringRabbitConfig {
    
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange("normal.direct");
    }

    @Bean
    public Queue normalQueue(){
        return QueueBuilder
                .durable("normal.queue")
                .deadLetterExchange("dlx.direct")
                .build();
    }

    @Bean
    public Binding normalQueueBinding(Queue normalQueue, DirectExchange normalExchange){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("dlx");
    }
    
}

注:
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

延迟消息插件-DelayExchange

这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

在这里插入图片描述
基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
延迟消息插件
插件下载地址

由于我安装的MQ是3.8.x版本,因此这里下载3.8.17版本,下载时注意版本对应
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

docker volume inspect mq-plugins

结果如下:
在这里插入图片描述

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。
接下来执行命令,安装插件:

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

执行结果:
在这里插入图片描述

声明延迟交换机
基于注解方式:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

基于@Bean的方式:

@Slf4j
@Configuration
public class DelayExchangeConfig {

    @Bean
    public DirectExchange delayExchange(){
        return ExchangeBuilder
                .directExchange("delay.direct") // 指定交换机类型和名称
                .delayed() // 设置delay的属性为true
                .durable(true) // 持久化
                .build();
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue("delay.queue");
    }
    
    @Bean
    public Binding delayQueueBinding(){
        return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
    }
}

注:
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息。

其他文章

发送者可靠性
消费者可靠性
延迟信息


原文地址:https://blog.csdn.net/weixin_44917201/article/details/145229295

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!