自学内容网 自学内容网

Rabbitmq消息队列,安装,使用,三种工作模式

产品

消息队列技术介绍
消息队列概述

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能、高可用、可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

目前在生产环境,使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

MQ概述

MQ全称 Message Queue([kjuː])(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

(队列是一种容器,用于存放数据的都是容器,存放消息的就是消息队列)

分布式系统的调用:

方式一:直接调用

order

product

account

方式二:间接调用

A将数据存放到中间一个系统,通过中间的系统发送到B

中间系统可以成为中间件MQ

生产者-》中间件《--消费者

MQ是用于存放消息的中间件

被调用者叫生产者 调用者是消费者(微服务中说过)

MQ的优势和劣势

优势

应用解耦:提高系统容错性和可维护性。

异步提速:提升用户体验和系统吞吐量。

削峰填谷:提高系统稳定性。

应用解耦

系统的耦合性越高,容错性就越低,可维护性就越低。

例:订单系统 的时候 依赖于库存系统 支付系统 物流系统 当库存系统发生异常,就有可能导致订单系统发生异常 下单失败

追加系统 x 就只能修改订单系统更改代码 导致维护性比较低

使用 MQ 使得应用间解耦,提升容错性和可维护性

库存系统宕机订单系统影响不大,因为消息已经发送到mq了当库存系统恢复的时候就可以正常使用了。

追加系统的时候跟订单系统无关

已将数据发送到MQ了,直接从MQ中拿就行了,无需更改订单中的代码,可维护性提高

异步提速

一个下单操作耗时:20 + 300 + 300 + 300 = 920ms

用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。

提升用户体验和系统吞吐量(单位时间内处理请求的数目)。

以前920ms处理一个请求,现在25ms处理一个请求,系统的吞吐量(单位时间内访问量)增加

削峰填谷(削峰)

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

使用MQ后,可以提高系统稳定性。

安装rabbitmq

安装依赖环境

yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
方式一

使用rpm安装

创建好要放入的文件夹

把我们需要的包都放进文件夹里

然后用rpm命令安装

rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm

安装完毕

启动

systemctl start rabbitmq-server

开启管理页面

rabbitmq-plugins enable rabbitmq_management

修改配置文件

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app 

将原来39行的内容改为

保存退出

重启

 systemctl restart rabbitmq-server

浏览器访问管理页面

http://192.168.184.34:15672/

默认用户密码都为guest

方式二

安装

yum install -y erlang
yum install -y rabbitmq-server

只需要这两条命令

配置管理页面

开启管理页面

rabbitmq-plugins enable rabbitmq_management

不需要修改配置文件直接使用guest/guest 就可以登录

启动

systemctl start rabbitmq-server

访问

http://192.168.184.34:15672/

注意:

Rabbitmq 端口号是:5672

管理端端口号是:15672

操作管理

添加用户

想给这个用户设置可以操作的虚拟主机

添加虚拟主机

把/yyl虚拟主机 赋给yyl用户进行操作

出现下图就是成功

添加消息队列

添加了一个队列后我们可以生产一条消息

点进我们刚创好的队列里

找到Publish message 生产一条消息然后点击Publish message 发送

可以看到我们的队列这里多了一条消息

消费消息

添加依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

编写配置类

package com.example.rabbitmq01.listener;
​
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
@Component
public class MyListener {
​
    @RabbitListener(queues = "test-one1")
    public void listen(Message message){
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println("监听到的消息是:"+s);
    }
​
​
}

yml文件

spring:
  rabbitmq:
    host: 192.168.227.99
    username: guest
    password: guest
    virtual-host: /cxy
server:
  port: 8090

启动

消费消息

成功监听到消息

消息被消费

添加交换机

创建好交换机,绑定我们的两个队列

绑定好之后交换机这里会显示,也可以看到绑定成功

因为我们默认交换机创建时是广播模式,所以交换机发送消息时会像广播一样所有的队列都能收到发送的消息

交换机发送消息,点击我们刚创建好的交换机

发送消息

可以看到,发送完消息后,因为我们的交换机的模式为广播模式,跟它绑定的队列都会收到消息

我们再次消费一下消息

可以看到,两个队列接收到的消息都是一样的

工作模式

可以看到,上面我们演示了三种工作模式,也分别对应了下面这三种

  1. 简单工作模式

生产者 队列 消费者

生产出一个消息给我们的队列,然后我们用工具去消费我们的消息

  1. 工作模式

队列之后

一个队列和两个消费者,两个消费者轮询接收消息

3.发布订阅模式

将我们的消息给交换机,交换机有三种模式:

Fanout: 广播模式 没有routingkey

没有key值,能给所有绑定的队列发送消息

Direct: 路由模式 有routingkey 值是固定的

有固定的key值,发送消息时可以选择key值发送,

也就是指定发送消息到哪个队列

Topic:主题模式 有routingkey 值是有通配符

没有固定的key值,但是有固定的格式,按消息的格式指定发送到哪个队列里

消息的可靠性传递

确认模式

Confirm

确认消费者是否接收到消息

回退模式

Return

查看交换机中的消息是否传递到队列里面

springboot整合rabbitmq

用代码创建一个交换机和两个队列,并且让两个队列绑定在交换机上,交换机的模式为路由模式,选择key值给两个中某个key值匹配的队列发消息,并且监听到消息

创建微服务,也就是在外面创建一个项目,里面有多个模块,我们只需在里面创建两个模块即可,一个作为生产者去生产消息,一个作为消费者去消费我们的数据

创建微服务

外面的项目只需留一个pom文件就好,在添加两个module模块,

右键创建模块

添加依赖(注:只需要在最外层的父项目添加依赖即可,子项目也可以使用夫项目的依赖)

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.2</version>
    </parent>
        <!--web项目-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
​
        <!-- 测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

两个子项目的yml文件

spring:
  rabbitmq:
    host: 192.168.227.99
    username: guest
    password: guest
    virtual-host: /cxy

生产者

创建交换机和两个队列

绑定

将两个队列绑定到交换机上并且附上了对应的key

package com.example.producter.config;
​
​
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class RabbitConfig {
​
    // 声明一个交换价
​
    public static final String EXCHANGE_NAME = "ROUTING_EXCAHNGE";
    public static final String QUEUE_NAME_1 = "ROUTING_QUEUE_1";
    public static final String QUEUE_NAME_2 = "ROUTING_QUEUE_2";
    public static final String ROUTING_KEY = "a";
​
    @Bean("exchange")
    public Exchange getExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(false).build();
​
    }
​
    @Bean("queue1")
    public Queue getQueue1() {
        // 创建了一个队列  队列的名字是ROUTING_QUEUE_1 不持久化
        return QueueBuilder.nonDurable(QUEUE_NAME_1).build();
​
    }
​
    @Bean("queue2")
    public Queue getQueue2() {
        // 创建了一个队列  队列的名字是ROUTING_QUEUE_1 不持久化
        return QueueBuilder.nonDurable(QUEUE_NAME_2).build();
​
    }
​
    //绑定
    @Bean
    public Binding getBinding1(@Qualifier("exchange") Exchange exchange, @Qualifier("queue1")Queue queue) {
        // 交换机和队列进行绑定
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
​
    }
​
    @Bean
    public Binding getBinding2(@Qualifier("exchange") Exchange exchange, @Qualifier("queue2")Queue queue) {
        // 交换机和队列进行绑定
        return BindingBuilder.bind(queue).to(exchange).with("b").noargs();
    }
​
    @Bean
    public Binding getBinding3(@Qualifier("exchange") Exchange exchange, @Qualifier("queue2")Queue queue) {
        // 交换机和队列进行绑定
        return BindingBuilder.bind(queue).to(exchange).with("c").noargs();
    }
}
生产消息

package com.example.producter;
​
import com.example.producter.config.RabbitConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
​
import javax.annotation.Resource;
​
@SpringBootTest
class ProducterApplicationTests {
​
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME,"b","这是给消息队列b发的消息");
    }
​
}
启动这个子项目

添加成功

绑定成功

消息发送成功

消费者

接收消息

启动消费者项目

可以看到成功消费掉队列的消息


原文地址:https://blog.csdn.net/Cao_XinYang/article/details/142445115

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!