自学内容网 自学内容网

RabbitMQ的工作模式

(一)工作模式

RabbitMQ有7种工作模式来进行消息传递,我们上一篇博客就是简单模式

1.简单模式(simple)

 也就是点对点的形式

P就是生产者,C就是消费者,Queue就是消息队列(生产者向queue种放消息,consume从中取消息)

特点:只有一个生产者和一个消费者,消息只被消费一次,又叫点对点模式

2.工作队列(work queue)

一个生产者P,多个消费者,在多个消息的情况下,queue会把消息分派给不同的消费者,每个消费者会收到不同的消息(多个消费者共同处理一个生产者的消息)

特点:消息不会重复,分配给不同消费者

适用场景:集群环境中共同处理问题

3.发布/订阅(publish/subscribe)

图中X叫交换机,之所以上面两个图没画交换机是因为上面两个交换机只有映射(定向)作用,而这里的交换机起到了其他作用

那我们这里就先插入讲一下交换机的分类

exchange:交换机

作用:接收生产者发送的消息,并且按照一定的规则映射到一个或者多个的消息队列中

分类:RabbitMQ交换机有四种类型,fanout,direct,topic,headers,不同类型有着不同的路由策略,AMQP协议里还有另外两种(system和自定义)但是RabbitMQ中没有就不多说

1.fanout(广播):把消息交给所有绑定到交换机的队列(也就是上述的发布订阅模式)

2.direct(定向):把消息(消息中有一个routing key)交给符合指定binding key的队列(routing模式)

3.topic(通配符):把消息交给符合routing模式的队列(topics模式)

4.headers类型:不依赖于路由键的匹配规则来路由消息,⽽是根据发送的消息内容中的 headers属性进⾏匹配.headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.

交换机只负责转发消息,没有存储消息的能力,因此如果没有任何队列与exchange绑定时,消息就会丢失

那我们通过什么进行绑定?

Routing Key:生产者将消息发送给交换机时,只当的字符串,用来告诉交换机如何处理这个消息

Binging key: RabbitMQ中通过bindingkey将交换机与队列关联起来,在绑定时一般指定一个binding key,这样Rabbitmq就知道如何正确把消息映射到队列了

就比如这张图,如果我们生产者给交换机的routing key是orange,那么消息就会到Q1 

其实Binding key也是routing key的一种,为了避免我们混淆

我们可以认为,生产者(发送消息)给交换机的是Routingkey,在交换机绑定队列时是Binding key

4.路由模式(Rounting)

我们依然用这张图

我们可以说路由模式是发布订阅模式的变种,通过我们刚刚说的Routingkey和Bindingkey进行映射到对应的队列中,所以区别就是发布订阅模式是无差别发送给任何此交换机绑定的队列中,而路由模式要在此交换机绑定的队列中找到满足Routingkey和Bindingkey相等的队列再传消息

5.通配符模式(Topic)

  路由模式的升级版,也就是我们的Bindingkey可以支持通配符了,让我们的映射更加灵活,与路由模式思路是相同的

每个.分割一个单词,*只可以表示一个单词,#可以表示多个单词

6.rpc通信

  在rpc通信中没有生产者消费者,或者说都是生产者也都是消费者,通过两个队列实现了一个回调过程

 首先客户端先给一个队列中放消息,放的消息中的属性设置了replyto和correlatioin-id,replyto是为了告诉服务端一会你返回的消息放到哪一个队列中,correlatioin-id是为了确保是否响应的是同一个数据

  然后服务器从队列中拿到这个消息解析后处理请求,之后把响应消息放到replyto指定队列中并且设置correlatioin-id与客户端传送的相同

 7.发布确认(Publisher Confirms)

发布确认是RabbitMQ提供的一种确保消息能狗可靠发送到RabbitMQ服务器的机制,我们会在之后详细的说,这里只大概说一下

如果我们要使用发布确认,生产者需要将channel设置为confirm模式(channel.confirmSelect()),然后我们发布的每一条消息都会获得一个唯一的id,此id会和消息进行关联,方便确认消息的状态,当消息被服务器接收后会给我们返回一个ack给生产者(包含刚刚说的唯一id),如果没收到会返回nack

然后我们生产者又分为三种确认方式,单个确认方式,批量确认方式,异步确认方式

单个确认就是我们要等待服务器给我们返回ack或者nack后才可以发送下一条消息,批量确认是一次性发多个消息,然后服务器会给我们返回ack或者nack还有id来表示从这个id以前的数据都确认了。异步就是边发送边确认,具体我们之后会说

(二)SpringBoot整合RabbitMQ

  我们这里不全部演示,因为简单模式跟工作模式很像,路由模式和通配符模式很像,,所以这两个模式我们就不写了

 首先我们还是要先引入依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

然后我们要添加配置

spring.rabbitmq.addresses=amqp://**:w85315678@62.234.46.219:5672/test

1.工作模式

接下来我们要编写生产者代码

@RequestMapping("/producer")
@RestController
public class Produce {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("/work")
    public String work(){
        rabbitTemplate.convertAndSend("", Constant.WORK_QUEUE,"WORK");
        return "发送成功";
    }
}

然后我们看返回结果

 ,这条消息已经成功的到我们队列中了,之后我们要编写消费者代码

@Component
public class WorkConsume {
    @RabbitListener(queues = Constant.WORK_QUEUE)
    public void listenerQueue(String message){
        System.out.println("1接收到消息: "+message);
    }
    @RabbitListener(queues = Constant.WORK_QUEUE)
    public void listenerQueue2(String message){
        System.out.println("2接收到消息: "+message);
    }
}

我们已经成功接收到消息

那我们在这里讲一下用到的类RabbitTemplate 这个就是用来发送消息的,注意我们包不要导错了

再来看这个参数,首先就是交换机名称,然后就是我们要传入的routingkey用来映射到队列中,然后最后一个就是要传入的消息,可以是String类型,也可以是一个对象等等 

然后我们来看@RabbitListener这个注解,这是用来监听RabbitMQ队列的注解,通过这个注解,可以定义一个方法以便从RabbitMQ队列中接收消息,该注解支持多种参数类型,这些参数类型代表了从RabbitMQ队列中接收到的消息

1. String :返回消息的内容

2. Message ( org.springframework.amqp.core.Message ):Spring AMQP的 Message 类,返回原始的消息体以及消息的属性,如消息ID,内容,队列信息等.

 3. Channel ( com.rabbitmq.client.Channel ):RabbitMQ的通道对象,可以⽤于进⾏更 ⾼级的操作,如⼿动确认消息

如果我们接收的是Message

就大概是这个样子 

2.发布订阅模式

我们上面的发布订阅图中对比于工作模式多了一个exchange角色,之所以工作模式没有,是因为我们直接使用的是虚拟机自带的交换机

那发布订阅的代码步骤跟工作模式差不多,但是多了一个声明交换机和交换机与队列绑定的步骤

我们来看看生产者代码

@RequestMapping("/fanout")
    public String fanout(){
        rabbitTemplate.convertAndSend(Constant.FANOUT_EXCHANGE,"","WORK");
        return "发送成功";
    }

 再来看看交换机和队列绑定的代码

 @Bean("fanoutQueue1")
    public Queue fanoutQueue1(){
        return QueueBuilder.durable(Constant.FANOUT_QUEUE1).build();
    }
    @Bean("fanoutQueue2")
    public Queue fanoutQueue2(){
        return QueueBuilder.durable(Constant.FANOUT_QUEUE2).build();
    }
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange(Constant.FANOUT_EXCHANGE).durable(true).build();
    }
    @Bean
    public Binding fanoutBinding1(FanoutExchange exchange,@Qualifier("fanoutQueue1")Queue queue){
        return BindingBuilder.bind(queue).to(exchange);
    }
    @Bean
    public Binding fanoutBinding2(FanoutExchange exchange,@Qualifier("fanoutQueue2")Queue queue){
        return BindingBuilder.bind(queue).to(exchange);
    }

我们发现已经成功发送了 

我们来写消费者的代码

3.通配符模式

 交换机类型为Direct时,会把消息交给符合指定routingkey的队列.队列和交换机的绑定,不是任意的绑定了,⽽是要指定⼀个RoutingKey(路由key)消息的发送⽅在向Exchange发送消息时,

也需要指定消息的RoutingKey Exchange也不再把消息交给每⼀个绑定的key,⽽是根据消息的RoutingKey进⾏判断,

只有队列的 RoutingKey和消息的RoutingKey能够匹配,才会接收到消息

声明交换机和队列代码

@Bean("topicExchange")
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(Constant.TOPIC_EXCHANGE).durable(true).build();
    }
    @Bean("topicQueue1")
    public Queue topicQueue1(){
        return QueueBuilder.durable(Constant.TOPIC_QUEUE1).build();
    }
    @Bean("topicQueue2")
    public Queue topicQueue2(){
        return QueueBuilder.durable(Constant.TOPIC_QUEUE2).build();
    }
    @Bean
    public Binding topicBind1(@Qualifier("topicExchange") TopicExchange exchange,@Qualifier("topicQueue1")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("*.error");
    }
    @Bean
    public Binding topicBind2(@Qualifier("topicExchange") TopicExchange exchange,@Qualifier("topicQueue2")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("#.info");
    }
    @Bean
    public Binding topicBind3(@Qualifier("topicExchange") TopicExchange exchange,@Qualifier("topicQueue2")Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("*.error");
    }

生产者代码

  @RequestMapping("/topic/{routingkey}")
    public String topic(@PathVariable("routingkey")String routingkey){
        rabbitTemplate.convertAndSend(Constant.TOPIC_EXCHANGE, routingkey,"topic");
        return "发送成功";
    }

 消费者代码

@Component
public class Topic {
    @RabbitListener(queues = Constant.TOPIC_QUEUE1)
    public void listenerQueue1(String message){
        System.out.println("topic1接收到消息: "+message);
    }
    @RabbitListener(queues = Constant.TOPIC_QUEUE2)
    public void listenerQueue2(String message){
        System.out.println("topic2接收到消息: "+message);
    }
}

 


原文地址:https://blog.csdn.net/huapiaoy/article/details/145017202

免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!