自学内容网 自学内容网

MQ(仅供自己参考)

同步通讯的优缺点:

优点:时效高,数据一致,过程简单

缺点:耦合度高。性能下降。CPU等待资源的浪费。级联失败。

2、异步通讯:异步调用常见的实现就是事件驱动模式

异步的优缺点:

优点:耦合度低 ,吞吐量提升 ,故障隔离(不在联机发生失败) ,流量削峰(Broker缓存事件,让其他服务慢慢来执行)

缺点:依赖于Broker的可靠性、安全性、吞吐量

架构复杂,没有明显的流程线,不好追踪管理

一般的项目都是使用同步通讯,因为需要拿到其他服务的返回结果,而异步通讯只是通知他有事情要做,而不需要他返回结果。

三、RabbitMq:

是一个开源的消息代理软件,广泛用于实现消息传递和异步处理。它基于消息队列的设计理念,允许不同的应用程序或服务之间进行通信,而无需它们直接相互依赖。

主要特点:

  1. 可靠性:RabbitMQ 提供消息确认机制,确保消息在处理过程中不会丢失。

  2. 灵活的路由:通过交换机和队列的配置,RabbitMQ 支持多种消息路由模式,包括点对点和发布/订阅。

  3. 多协议支持:支持多种消息协议,如 AMQP、STOMP 和 MQTT。

  4. 高可用性:通过集群和镜像队列功能,可以实现高可用性和负载均衡。

  5. 管理界面:提供用户友好的管理界面,便于监控和管理消息队列。

几种模式:

AMQP:

AMQP(Advanced Message Queuing Protocol)是一种开放标准的消息传递协议,旨在支持消息的可靠传递和异步通信。它允许不同的应用程序通过消息代理进行通信,无论这些应用程序使用的编程语言或平台如何。

AMQP 的主要特点包括:

  1. 消息队列:支持将消息存储在队列中,允许消费者异步处理消息。
  2. 发布/订阅模式:支持多种消息传递模式,包括点对点和发布/订阅。
  3. 可靠性:提供消息确认机制,确保消息不会丢失。
  4. 灵活性:支持不同的消息传递场景,适用于多种分布式系统。
  5. 跨平台:由于其开放性,AMQP 可以在不同的操作系统和语言之间进行互操

RabbitMQ的依赖,yml配置:

   <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>




spring:
  rabbitmq:
    host: 192.168.136.128   #地址
    port: 5672                #端口
    username: root        #用户账号
    password: root        #用户密码
    virtual-host: /        

RabbitMQ:发送和接收:

发送

package org.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessage()
    {
        rabbitTemplate.convertAndSend("simple.cc","hello,RabbitMq");
    }

}

接收

package org.example.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class SimpleQueueListener {

    @RabbitListener(queues = "simple.cc")
    public void getMessage(String message)
    {
        log.info(message);
    }
}

2、workqueue:

代码:workqueue有预取机制,当一个没有设置预取上限,那么两个消费者会均分消息,即使不能立即处理也会将消息拿到。当设置prefetch的值那么就会按这个上限那取一定数量的消息,将消息处理完成之后,再从队列中拿消息。

package org.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void sendWorkQueue()
    {
        for (int i = 0; i < 50; i++) {
            rabbitTemplate.convertAndSend("simple.cc","hello,--"+ i);
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

}




package org.example.listener;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class SimpleQueueListener {

    //@RabbitListener(queues = "simple.cc")
    //public void getMessage(String message)
    //{
    //    log.info(message);
    //}

    @RabbitListener(queues = "simple.cc")
    public void getWorkQueueMessage(String message)
    {
        log.info(message);
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @RabbitListener(queues = "simple.cc")
    public void getWorkQueueMessages2(String message) {
        log.error(message);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}




3、发布订阅模式:

4、Fanout交换机:将消息路由到每一个队列,缺点是exchange只是转发消息,而不保存,如果路由的时候丢失消息那么消息就直接丢失了。

代码:

    @RabbitListener(queues = "fanout.queue1")
    public void getFanoutMessage(String message)
    {
        log.info(message);
    }

    @RabbitListener(queues = "fanout.queue2")
    public void getFanoutMessage1(String message)
    {
        log.info(message);
    }



package org.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {

    @Bean
    public FanoutExchange fanoutExchange()
    {
        return new FanoutExchange("cc.fanout");
    }

    @Bean
    public Queue fanoutQueue1()
    {
        return new Queue("fanout.queue1");
    }

    @Bean
    public Queue fanoutQueue2()
    {
        return new Queue("fanout.queue2");
    }

    @Bean
    public Binding binding(Queue fanoutQueue1,FanoutExchange fanoutExchange)
    {
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    @Bean
    public Binding binding2(Queue fanoutQueue2,FanoutExchange fanoutExchange)
    {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}




    @Test
    public void sendFanoutMessage()
    {
        rabbitTemplate.convertAndSend("cc.fanout","","hello,fanout");
    }

5、Direct:可以通过key与相应的queue绑定,绑定之后相应的key只能发送到相应的queue上。当然一个queue可以绑定多个key,那么就可以实现广播(fanout)

代码:

   @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "cc.direct",type = ExchangeTypes.DIRECT),
            key = {"blue","red"}
    ))
    public void getDirectMessage1(String message)
    {
        log.info(message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "cc.direct",type = ExchangeTypes.DIRECT),
            key = {"yellow","red"}
    ))
    public void getDirectMessage2(String message)
    {
        log.info(message);
    }



  @Test
    public void sendFanoutMessage()
    {
        rabbitTemplate.convertAndSend("cc.direct","red","hello,direct");
        rabbitTemplate.convertAndSend("cc.direct","blue","hello,direct,blue");
        rabbitTemplate.convertAndSend("cc.direct","yellow","hello,direct,yellow");
    }

6、Topic:通过通配符来匹配某一类消息,通过key的匹配来选择不需要的某一类消息

代码:

    @Test
    public void sendTopicMessage()
    {
        rabbitTemplate.convertAndSend("cc.topic","china.cc","hello,china,cc");
        rabbitTemplate.convertAndSend("cc.topic","china.news","china,weather");
        rabbitTemplate.convertAndSend("cc.topic","japan.news","japan,weather");
    }
  @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "cc.topic",type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void getTopicMessage1(String message)
    {
        log.info(message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "cc.topic",type = ExchangeTypes.TOPIC),
            key = "*.news"
    ))
    public void getTopicMessage2(String message)
    {
        log.info(message);
    }

7、SpringAMQP:消息转换器,默认使用jdk的序列化

引依赖:

<!--json-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.14.0</version>
        </dependency>

添加配置,覆盖原本的序列化方式:

 @Bean
    public MessageConverter messageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }


原文地址:https://blog.csdn.net/qq_61839608/article/details/142702426

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!