自学内容网 自学内容网

RabbitMQ延迟队列

1、延迟队列使用场景

https://blog.csdn.net/weixin_42942786/article/details/139940269

场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;

2、使用rabbitmq-delayed-message-exchange 延迟插件

2.1、下载

选择对应的版本下载 rabbitmq-delayed-message-exchange 插件,
下载地址:http://www.rabbitmq.com/community-plugins.html

在这里插入图片描述
在这里插入图片描述

2.2、安装

插件拷贝到 RabbitMQ 服务器plugins目录下

2.2.1、解压

unzip rabbitmq_delayed_message_exchange-3.10.2.ez

如果unzip 没有安装,先安装一下

yum install unzip -y

2.2.2、启用插件

./rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件;

2.2.3、查询安装情况

./rabbitmq-plugins list 查询安装的所有插件;
重启rabbitmq使其生效;(此处也可以不重启)
在这里插入图片描述
消息发送后不会直接投递到队列,
而是存储到 Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部);
延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;

Mnesia 是一个小型数据库,不适合于大量延迟消息的实现
解决了消息过期时间不一致出现的问题。

2.4、示例

在这里插入图片描述

2.4.1、RabbitConfig配置类(关键代码)

package com.power.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {

    @Value("${my.exchangeName}")
    private String exchangeName;

    @Value("${my.queueDelayName}")
    private String queueDelayName;

    //创建自定义交换机
    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(exchangeName,"x-delayed-message",true,false,arguments);
    }

    //创建队列
    @Bean
    public Queue queueNormal() {
        //建造者模式创建队列
        return QueueBuilder
                .durable(queueDelayName)//队列名称
                .build();
    }

    //队列绑定交换机
    @Bean
    public Binding binding(CustomExchange customExchange, Queue queueNormal) {
        return BindingBuilder.bind(queueNormal).to(customExchange).with("plugin").noargs();
    }

}

2.4.2、发送消息(关键代码)

package com.power.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Date;

@Service
@Slf4j
public class MessageService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Bean
    public void sendMsg(){
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("x-delay",25000);//第一条消息设置过期时间25秒
            Message message = MessageBuilder.withBody("hello world 01".getBytes())
                    .andProperties(messageProperties)
                    .build();
            rabbitTemplate.convertAndSend("exchange.delay.04","plugin",message);
            log.info("消息order发送完毕,发送时间是:{}",new Date());
        }
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("x-delay",15000);//第二条消息设置过期时间15秒
            Message message = MessageBuilder.withBody("hello world 02".getBytes())
                    .andProperties(messageProperties)
                    .build();
            rabbitTemplate.convertAndSend("exchange.delay.04","plugin",message);
            log.info("消息pay发送完毕,发送时间是:{}",new Date());
        }

    }
}

2.4.3、application.yml配置类

server:
  port: 8080
spring:
  application:
    name: delay-plugins-test01

  rabbitmq:
    host: 你的服务器IP
    port: 5672
    username: 你的账号
    password: 你的密码
    virtual-host: power

my:
  exchangeName: exchange.delay.04         # 交换机
  queueDelayName: queue.delay.04          # 正常队列

2.4.4、接收消息

package com.power.message;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class ReceiveMessage {
    @RabbitListener(queues = "queue.delay.04")
    public void receiveMsg(Message message){
        String body = new String(message.getBody());
        log.info("接收到的消息为:{},接收时间为:{}",body,new Date());
    }
}

2.4.5、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.power</groupId>
  <artifactId>rabbit_07_delay04_plugins</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>rabbit_07_delay04_plugins</name>

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.13</version>
    <relativePath/>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.24</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

</project>

2.4.6、测试

在这里插入图片描述


原文地址:https://blog.csdn.net/qq_46112274/article/details/143563861

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!