自学内容网 自学内容网

035 RabbitMQ五种工作模式

简单模式:一个生产者,一个消费者,不需要设置交换机(使用默认的交换机)
工作队列模式:一个生产者,多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
发布订阅模式Publish/subscribe:需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息广播发送到绑定的队列
路由模式:需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
通配符模式:需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
注意Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。

简单模式

rabbit-provider

Demo01TestSimpleQueue.java

package com.cubemall.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo01TestSimpleQueue {
    //目标:编写生产者发送消息到消息队列
    //实现过程:导入Rabbit模板对象,调用api接口
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMsg(){
        /**
         * 参数1: 队列名称
         * 参数2: 消息内容
         */
        rabbitTemplate.convertAndSend("simple_queue","hello 小兔子");
    }
}

rabbit-consumer

SimpleListener.java

package com.cubemall.listeners.simpleQueue;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "simple_queue")
public class SimpleListener {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("接收消息 == " + msg);
    }
}

RabbitMQConsumerApplication.java

package com.cubemall;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(RabbitMQConsumerApplication.class,args);
    }
}

工作队列模式

rabbit-provider

Demo02TestWorkQueue.java

package com.cubemall.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo02TestWorkQueue {
    //目标:编写生产者发送消息到消息队列
    //实现过程:导入Rabbit模板对象,调用api接口
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMsg(){
        /**
         * 参数1: 队列名称
         * 参数2: 消息内容
         */
        for (int i = 0; i < 10000; i++){
            rabbitTemplate.convertAndSend("work_queue","hello 小兔子");
        }

    }
}

rabbit-consumer

WorkListener01.java

package com.cubemall.listeners.workQueue;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "work_queue")
public class WorkListener01 {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("work_queue 01 接收消息 == " + msg);
    }
}

WorkListener02.java

package com.cubemall.listeners.workQueue;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "work_queue")
public class WorkListener02 {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("work_queue 02 接收消息 == " + msg);
    }
}

发布与订阅模式(fanout)

rabbit-provider

Demo03TestFanoutExchange.java

package com.cubemall.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo03TestFanoutExchange {
    //目标: 通过发布订阅模式,发送消息到交换机
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMsg(){
        /**
         * 参数1: 交换机
         * 参数2: 路由键
         * 参数3: 消息内容
         */
        for (int i = 0; i < 100; i++){
            rabbitTemplate.convertAndSend("fanout_exchange","","hello 小兔子");
        }



    }
}

rabbit-consumer

FanoutListener01.java

package com.cubemall.listeners.fanoutExchange;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "fanout_queue1")
public class FanoutListener01 {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("fanoutqueue1 接收消息 == " + msg);
    }
}

FanoutListener02.java

package com.cubemall.listeners.fanoutExchange;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "fanout_queue2")
public class FanoutListener02 {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("fanoutqueue2 接收消息 == " + msg);
    }
}

路由模式(direct)

rabbit-provider

Demo04TestRoutingExchange.java

package com.cubemall.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo04TestRoutingExchange {
    //目标: 通过发布订阅模式,发送消息到交换机
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMsg(){
        /**
         * 参数1: 交换机
         * 参数2: 路由键
         * 参数3: 消息内容
         */
        for (int i = 0; i < 100; i++){

            if (i % 2 == 0) {
                rabbitTemplate.convertAndSend("routing_exchange","info","hello 小兔子 ["+i+"] +info");
            } else {
                rabbitTemplate.convertAndSend("routing_exchange","error","hello 小兔子 ["+i+"] +error");
            }



        }



    }
}

rabbit-consumer

RoutingListener1.java

package com.cubemall.listeners.routingQueue;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "routing_queue1")
public class RoutingListener1 {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("routingqueue1 接收消息 == " + msg);
    }
}

RoutingListener2.java

package com.cubemall.listeners.routingQueue;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "routing_queue2")
public class RoutingListener2 {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("routing_queue2 接收消息 == " + msg);
    }
}

通配符模式(topic)

rabbit-provider

Demo05TestTopicExchange.java

package com.cubemall.test;


import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo05TestTopicExchange {
    //目标: 通过发布订阅模式,发送消息到交换机
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMsg(){
        /**
         * 参数1: 交换机
         * 参数2: 路由键
         * 参数3: 消息内容
         */

        rabbitTemplate.convertAndSend("topic_exchange","item.insert","hello 小兔子 [item.insert]");

        rabbitTemplate.convertAndSend("topic_exchange","item.insert.abc","hello 小兔子 [item.insert.abc]");







    }
}

rabbit-consumer

TopicListener1.java

package com.cubemall.listeners.topicQueue;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "topic_queue1")
public class TopicListener1 {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("topic_queue1 接收消息 == " + msg);
    }
}

TopicListener2.java

package com.cubemall.listeners.topicQueue;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
消费者消息队列监听器
 */
@Component
@RabbitListener(queues = "topic_queue2")
public class TopicListener2 {
    //处理消息方法
    @RabbitHandler
    public void simpleHandler(String msg){
        System.out.println("topic_queue2 接收消息 == " + msg);
    }
}


原文地址:https://blog.csdn.net/m0_46695127/article/details/143197512

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!