RabbitMQ七种工作模式之简单模式, 工作队列模式, 发布订阅模式, 路由模式, 通配符模式
文章目录
RabbitMQ 共提供了7种⼯作模式, 进⾏消息传递
一. Simple(简单模式)
P: ⽣产者, 也就是要发送消息的程序
C: 消费者,消息的接收者
Queue: 消息队列, 类似⼀个邮箱, 可以缓存消息; ⽣产者向其中投递消息, 消费者从其中取出消息.
特点: ⼀个⽣产者P,⼀个消费者C, 消息只能被消费⼀次. 也称为点对点(Point-to-Point)模式
适⽤场景: 消息只能被单个消费者处理
公共代码:
public class Common {
public static final String HOST = "139.9.84.204";
public static final Integer PORT = 5672;
public static final String VIRTUALHOST = "study";
public static final String USERNAME = "admin";
public static final String PASSWORD = "admin";
//简单模式
public static final String simpleQueue = "simpleQueue";
public static final String simpleMsg = "hello, simple.....";
}
生产者:
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//声明队列
//如果没有一个这样的队列, 会自动创建, 如果有, 则不创建
channel.queueDeclare(Common.simpleQueue, true, false, false, null);
for (int i = 0; i < 10; i++) {
//发送消息
String msg = Common.simpleMsg + i;
channel.basicPublish("", Common.simpleQueue, null, msg.getBytes(StandardCharsets.UTF_8));
}
//释放资源
channel.close();
connection.close();
}
}
消费者:
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(Common.simpleQueue, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("获取到队列: " + new String(body));
}
};
//消费消息
channel.basicConsume(Common.simpleQueue, true, consumer);
Thread.sleep(2000);
//释放资源
channel.close();
connection.close();
}
}
二. Work Queue(工作队列模式)
⼀个⽣产者P,多个消费者C1,C2. 在多个消息的情况下, Work Queue 会将消息分派给不同的消费者, 每个消费者都会接收到不同的消息.
特点: 消息不会重复, 分配给不同的消费者.
适⽤场景: 集群环境中做异步处理
公共代码:
//工作队列模式
public static final String workQueue = "workQueue";
public static final String workQueueMsg = "hello, workQueue.....";
生产者:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Common.HOST);
factory.setPort(Common.PORT);
factory.setVirtualHost(Common.VIRTUALHOST);
factory.setUsername(Common.USERNAME);
factory.setPassword(Common.PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Common.workQueue, true, false, false, null);
for(int i = 0; i < 10; i++){
String msg = Common.simpleMsg + i;
channel.basicPublish("", Common.workQueue, null, msg.getBytes(StandardCharsets.UTF_8));
}
channel.close();
connection.close();
}
}
消费者1, 消费者2(代码相同):
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Common.HOST);
factory.setPort(Common.PORT);
factory.setVirtualHost(Common.VIRTUALHOST);
factory.setUsername(Common.USERNAME);
factory.setPassword(Common.PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(Common.workQueue, true, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Common.workQueue, true, consumer);
// channel.close();
// connection.close();
}
}
先启动两个消费者, 再启动生产者:
三. Publish/Subscribe(发布/订阅模式)
图中X表⽰交换机, 在订阅模型中,多了⼀个Exchange角色, 过程略有变化
⼀个⽣产者P, 多个消费者C1, C2, X代表交换机消息复制多份,每个消费者接收相同的消息
⽣产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者
适合场景: 消息需要被多个消费者同时接收的场景. 如: 实时通知或者⼴播消息
概念介绍
Exchange: 交换机 (X).
作⽤: ⽣产者将消息发送到Exchange, 由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产者将消息投递到队列中, 实际上这个在RabbitMQ中不会发⽣. )
RabbitMQ交换机有四种类型: fanout,direct, topic, headers, 不同类型有着不同的路由策略. AMQP协议⾥还有另外两种类型, System和⾃定义, 此处不再描述.
- Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)
- Direct:定向,把消息交给符合指定routing key的队列(Routing模式)
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列(Topics模式)
- headers类型的交换器不依赖于路由键的匹配规则来路由消息, ⽽是根据发送的消息内容中的headers属性进⾏匹配. headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.
Exchange(交换机)只负责转发消息, 不具备存储消息的能⼒, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
RoutingKey: 路由键.⽣产者将消息发给交换器时, 指定的⼀个字符串, ⽤来告诉交换机应该如何处理这个消息.
Binding Key:绑定. RabbitMQ中通过Binding(绑定)将交换器与队列关联起来, 在绑定的时候⼀般会指定⼀个Binding Key, 这样RabbitMQ就知道如何正确地将消息路由到队列了.
公共代码:
//发布订阅模式
public static final String FANOUT_EXCHANGE = "fanoutExchange";
public static final String FANOUT_QUEUE1 = "fanoutQueue1";
public static final String FANOUT_QUEUE2 = "fanoutQueue2";
生产者:
创建交换机:
//创建交换机
channel.exchangeDeclare(Common.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
参数解释:
- exchange: 交换机名称
- type: 交换机类型
- durable: 是否持久化
- autoDelete: 自动删除, ⾃动删除的前提是⾄少有⼀个队列或者交换器与这个交换器绑定, 之后所有与这个交换器绑定的队列或者交换器都与此解绑
- internal: 一般为false, 如果设置为true, 表⽰内部使⽤, 客⼾端程序⽆法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种⽅式
- arguments: 参数
创建队列:
//创建队列
channel.queueDeclare(Common.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Common.FANOUT_QUEUE2, true, false, false, null);
绑定交换机和队列:
//绑定交换机和队列
channel.queueBind(Common.FANOUT_QUEUE1, Common.FANOUT_EXCHANGE, "");
channel.queueBind(Common.FANOUT_QUEUE2, Common.FANOUT_EXCHANGE, "");
参数说明:
- queue: 队列名
- exchange: 交换机名
- routingKey: 路由key, 路由规则, 如果交换机类型为fanout,routingkey设置为"",表⽰每个消费者都可以收到全部信息
发送消息:
//发送消息
String msg = "hello, fanout......";
channel.basicPublish(Common.FANOUT_EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));
完整代码:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare(Common.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);
//创建队列
channel.queueDeclare(Common.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Common.FANOUT_QUEUE2, true, false, false, null);
//绑定交换机和队列
channel.queueBind(Common.FANOUT_QUEUE1, Common.FANOUT_EXCHANGE, "");
channel.queueBind(Common.FANOUT_QUEUE2, Common.FANOUT_EXCHANGE, "");
//发送消息
String msg = "hello, fanout......";
channel.basicPublish(Common.FANOUT_EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));
}
}
消费者:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(Common.FANOUT_QUEUE1, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Common.FANOUT_QUEUE1, true, consumer);
}
}
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(Common.FANOUT_QUEUE2, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Common.FANOUT_QUEUE2, true, consumer);
}
}
四. Routing(路由模式)
路由模式是发布订阅模式的变种, 在发布订阅基础上, 增加路由key
发布订阅模式是⽆条件的将所有消息分发给所有消费者, 路由模式是Exchange根据RoutingKey的规则, 将数据筛选后发给对应的消费者队列
适合场景: 需要根据特定规则分发消息的场景
公共代码:
//路由模式
public static final String DIRECT_EXCHANGE = "directExchange";
public static final String DIRECT_QUEUE1 = "directQueue1";
public static final String DIRECT_QUEUE2 = "directQueue2";
生产者:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare(Common.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);
//创建队列
channel.queueDeclare(Common.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Common.DIRECT_QUEUE2, true, false, false, null);
//绑定交换机和队列
channel.queueBind(Common.DIRECT_QUEUE1, Common.DIRECT_EXCHANGE, "a");
channel.queueBind(Common.DIRECT_QUEUE2, Common.DIRECT_EXCHANGE, "a");
channel.queueBind(Common.DIRECT_QUEUE2, Common.DIRECT_EXCHANGE, "b");
channel.queueBind(Common.DIRECT_QUEUE2, Common.DIRECT_EXCHANGE, "c");
//发送消息
String msg1 = "hello, direct a......";
channel.basicPublish(Common.DIRECT_EXCHANGE, "a", null, msg1.getBytes(StandardCharsets.UTF_8));
String msg2 = "hello, direct b......";
channel.basicPublish(Common.DIRECT_EXCHANGE, "b", null, msg2.getBytes(StandardCharsets.UTF_8));
String msg3 = "hello, direct c......";
channel.basicPublish(Common.DIRECT_EXCHANGE, "c", null, msg3.getBytes(StandardCharsets.UTF_8));
}
}
消费者:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(Common.DIRECT_QUEUE1, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Common.DIRECT_QUEUE1, true, consumer);
}
}
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(Common.DIRECT_QUEUE2, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Common.DIRECT_QUEUE2, true, consumer);
}
}
五. Topics(通配符模式)
路由模式的升级版, 在routingKey的基础上,增加了通配符的功能, 使之更加灵活.
Topics和Routing的基本原理相同,即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列. 类似于正则表达式的⽅式来定义Routingkey的模式.
不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.
适合场景: 需要灵活匹配和过滤消息的场景
公共代码:
//路由模式
public static final String TOPIC_EXCHANGE = "topicExchange";
public static final String TOPIC_QUEUE1 = "topicQueue1";
public static final String TOPIC_QUEUE2 = "topicQueue2";
生产者:
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建交换机
channel.exchangeDeclare(Common.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);
//创建队列
channel.queueDeclare(Common.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Common.TOPIC_QUEUE2, true, false, false, null);
//绑定交换机和队列
channel.queueBind(Common.TOPIC_QUEUE1, Common.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Common.TOPIC_QUEUE2, Common.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Common.TOPIC_QUEUE2, Common.TOPIC_EXCHANGE, "c.#");
//发送消息
String msg1 = "hello, direct hello.a.b......";
channel.basicPublish(Common.TOPIC_EXCHANGE, "hello.a.b", null, msg1.getBytes(StandardCharsets.UTF_8));//匹配队列1,2
String msg2 = "hello, direct hello.world.b......";
channel.basicPublish(Common.TOPIC_EXCHANGE, "hello.world.b", null, msg2.getBytes(StandardCharsets.UTF_8));//匹配队列2
String msg3 = "hello, direct c.hello.b......";
channel.basicPublish(Common.TOPIC_EXCHANGE, "c.hello.b", null, msg3.getBytes(StandardCharsets.UTF_8));//匹配队列2
}
}
消费者:
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(Common.TOPIC_QUEUE1, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Common.TOPIC_QUEUE1, true, consumer);
}
}
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost(Common.HOST);//ip
factory.setPort(Common.PORT);//端口号
factory.setVirtualHost(Common.VIRTUALHOST);//虚拟机名称
factory.setUsername(Common.USERNAME);//用户名
factory.setPassword(Common.PASSWORD);//密码
//3. 创建连接connection
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare(Common.TOPIC_QUEUE2, true, false, false, null);
//消费消息
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume(Common.TOPIC_QUEUE2, true, consumer);
}
}
原文地址:https://blog.csdn.net/m0_73992740/article/details/144281571
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!