自学内容网 自学内容网

RabbitMQ的发布订阅模式

发布订阅模式

image-20230810180012118

是群发的概念,每条消息可以发送给多个消费者

在订阅模型中,多了一个 Exchange 角色

Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

  1. Fanout:广播,将消息交给所有绑定到交换机的队列

  2. Direct:定向,把消息交给符合指定routing key 的队列

  3. Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合 路由规则的队列,那么消息会丢失

Exchange和Queue之间还需要绑定才能发送消息

生产者代码
public class PSProducer {
public static void main(String[] args) throws Exception {
//创建连接
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.64.140");
ccf.setPort(5672);//可选,5672是默认端口
f.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//定义交换机
//服务器中如果没有交换机,就创建,有就直接使用
cc.exccangeDeclare("logs", "fanout");
//向交换机发送数据
while(true) {
System.out.print("输入:");
String s = new Scanner(System.in).nextLine();
cc.basicPublish("logs", "", null, s.getBytes());
}
}
}
消费者代码
public class PSConsumer {
public static void main(String[] args) throws Exception {
//创建连接
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.64.140");
ccf.setPort(5672);//可选,5672是默认端口
f.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//定义交换机
//服务器中如果没有交换机,就创建,有就直接使用
cc.exccangeDeclare("logs", "fanout");
//cc.queueDeclare(UUID.randomUUID().toString(), false, true, true, null);
String queue = cc.queueDeclare().getQueue();
cc.queueBind(queue, "logs", "");

//处理数据
DeliverCallback deliverCallback = new DeliverCallback() {

@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] a = message.getBody();
String b = new String(a);
System.out.println("收到:"+b);
System.out.println("消息处理完毕");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {

}
};
//3.接收数据
/**第二个参数:
 *         true:自动确认
 *         false:手动确认
 */
cc.basicConsume(queue,true, deliverCallback, cancelCallback);
}
}

上一篇文章:RabbitMQ的工作模式-CSDN博客icon-default.png?t=O83Ahttps://blog.csdn.net/Z0412_J0103/article/details/143354823下一篇文章: 


原文地址:https://blog.csdn.net/Z0412_J0103/article/details/143354922

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!