自学内容网 自学内容网

消息队列-rabbitmq

摘要:本文将对rabbitmq进行介绍,涉及到其队列和交换机等核心概念,其底层的工作原理,如何解决消息重复消息积压等问题,如何保证消息的可靠性等经典的消息队列问题。最后,本文以一个Spring Boot框架中使用rabbitmq来对rabbitmq进行实战。
关键词:消息队列,rabbitmq,Spring Boot,Java

rabbitmq面试题

  1. rabbitmq是什么
    rabbitmq是一个开源的消息中间件,基于erlang语言开发,实现了高度灵活的消息队列模式,实现了AMQP协议(高级消息队列协议)。
  2. AMQP协议
    AMQP协议是一个独立于语言的消息队列协议,其中包括一些交换机、队列等概念,而rabbitmq是一个最有名的实现。
  3. rabbitmq整体架构和核心概念
    如下图是rabbitmq的整体架构图。
    在这里插入图片描述
    首先生产者发送消息时,需要指定发送到那一个交换机,并且指定RoutingKey路由键,消费者负责消费队列中的消息,多个消息者消费同一个队列,可以采用轮询的方式来分配消息,broker可以理解为一个rabbitmq实例。在broker内部,有交换机和队列,交换机收到生产者的消息,根据标签中的路由键和自身的类型,去匹配各个队列的BoudingKey绑定键,从而将消息转发到对应的队列中。
  4. 交换机的类型
  • Fanout Exchange:这种交换机下会把所有消息广播到所有绑定在这个交换机上的队列
  • Direct Exchange:最常用的,原理就是根据路由键和绑定键的完全匹配,路由到指定的队列
  • Topic Exchange:此时绑定键支持正则匹配,可以含有一些匹配符,某个消息的路由键模糊匹配到了对应的规则,机会被转发到这个队列
  • Headers Exchange:根据消息的Headers属性进行路由,现在已经很少使用
  1. rabbitmq工作原理
    首先是生产者通过TCP连接到rabbitmq的broker上,并且在其上面建立虚拟连接信道进行通信。生产者在发送消息的时候,需要指定交换机和路由键,broker收到消息后,由对应的交换机根据路由键匹配队列,并且将消息转发到对应的队列中。如果找不到匹配的队列,则可能将消息丢弃或者回退给生产者。
    消费者消费消息也是通过TCP连接到rabbitmq的broker上,然后指定队列,消费那个队列中的消息。
    注意无论是生产者还是消费者,都是在tcp连接上建立的信道,每一个信道都有一个唯一的信道id,多个信道可以复用一个tcp连接,每一个信道对应一个操作rabbitmq的线程,这样做的好处是节省了tcp连接的开销,有点类似于http2中的stream id。
  2. 死信交换机和死信队列
    首先是死信的概念,死信指的是一些无法正常被消费者消费的消息,有以下三种情况:
  • 消息被拒绝
  • 消息在队列中的时间超过了设置的生存时间
  • 队列达到最大长度
    成为死信的消息会被死信交换机DLX交换到绑定在DLX上的死信队列DLQ上。
    死信队列的应用场景如下:
  • 消息重试:某个消息消费者消费失败或者拒绝,可以将该消息重新路由到死信队列中,进行处理
  • 延迟队列:如果一个消息想要控制延时x秒执行,可以将这个消息的过期时间设置为x,然后路由到一个无消费者的队列中,那么当x秒后这个消息过期后,就会被路由到死信队列中,这是消费者按照顺序消费死信队列中的消息就可以实现延迟队列。
  • 异常监控处理:将系统中产生的异常消息全部路由到死信队列中,然后监控处理系统专门消费这个死信队列中的消息
  1. 优先级队列
    优先级队列就是可以根据消息的优先级进行排序和处理的队列,可以通过设置队列的参数,然后每一个消息带有一个优先级系数。
  2. 惰性队列
    一般的队列消息都有存放在内存中,如果消息过多过大则容易引起内存溢出的问题。解决此类问题的关键就是惰性队列,惰性队列的消息是存放在磁盘中,只有当需要的时候才会被加载进内存。
  3. rabbitmq如何保证消息的可靠性
  • 从生产者发送消息的角度:将信道设置为确认模式,消息只有被broker成功接收才返回一个ack,生产者根据ack来重试发送消息,直到消息被broker成功接收
  • 从broker的角度:消息和队列都设置为持久化模式,将数据持久化到磁盘;rabbitmq部署为集群模式,并且通过镜像队列在多个节点之间同步和复制,避免单点故障
  • 从消费者的角度:使用手动确认,只有当消费者成功消费这一个消息后,给broker发送确认,broker才能删除这条消息
  1. 各种消息队列的比较
  • ActiveMQ: 基于JMS(Java消息服务)标准,高并发时性能有所受限
  • RabbitMQ:实现了AMQP协议,具有高可靠性、灵活性和可扩展性
  • RocketMQ:阿里巴巴开源的分布式消息中间件服务,实现了基于主题的发布订阅模式,具有高吞吐量低延迟的特点
  • kafka:高吞吐量、低延迟,适用于大数据场景、实时数据分析

rabbitmq实战

这里以一个非常经典的场景来展示实践。在注册账号的过程中,可能会用到邮箱来发送验证码,这个时候,发送邮件这个操作是在注册过程中同步进行,发送邮件完了在返回,还是异步调用呢?显然后者是对用户更加友好的。

环境准备

实验平台:ubuntu20.04, 已安装docker和git
首先执行下面的命令,通过docker容器部署的方式搭建rabbitmq。

环境准备

docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=myuser -e RABBITMQ_DEFAULT_PASS=mypassword rabbitmq:3-management

主要代码

以下代码定义了一个消费者专门消费email队列中的消息,然后调用相关邮件服务发送邮件。

@Component
public class RabbitmqListener {
    @Autowired
    EmailService emailService;

    @Autowired
    ElasticsearchService elasticsearchService;

    @RabbitListener(queues = "email")
    public void onEmailMessageFromEmailQueue(EmailMessage emailMessage) throws Exception {
        emailService.sendEmail(emailMessage.email, emailMessage.msg);
    }
}

以下方法为生产者代码,如下:

public void sendCode(String email) {
        String code_ = Utils.createCode();
        String key = email + code_;
        try {
            redisService.set(key, "1");
            redisService.expire(key, 180);
        } catch(Exception e) {
            redisService.del(key);
            log.error("发送验证码失败", e.getMessage());
            return;
        }
        String msg = String.format("你好,你这次操作的验证码为%s", code_);
        EmailMessage emailMessage = new EmailMessage(email, msg);
        rabbitTemplate.convertAndSend("wuHouBlog", "email", emailMessage);
    }

参考

-RabbitMQ 常见面试题


原文地址:https://blog.csdn.net/rtffcggh/article/details/144065531

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!