Java重要面试名词整理(十三):RocketMQ
文章目录
简述
RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。
早期阿里使用ActiveMQ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是,阿里开始关注Kafka。但是Kafka是针对日志收集场景设计的,他的高级功能并不是很贴合阿里的业务场景。尤其当他的Topic过多时,由于Partition文件也会过多,这就会加大文件索引的耗时,会严重影响IO性能。于是阿里才决定自研中间件,最早叫做MetaQ,后来改名成为RocketMQ。最早他所希望解决的最大问题就是多Topic下的IO性能压力。但是产品在阿里内部的不断改进,RocketMQ开始体现出一些不一样的优势。
优点 | 缺点 | 适合场景 | |
---|---|---|---|
Apache Kafka | 吞吐量非常大,性能非常好,集群高可用。 | 会有丢数据的可能,功能比较单一 | 日志分析、大数据采集 |
RabbitMQ | 消息可靠性高,功能全面。 | erlang语言不好定制。吞吐量比较低。 | 企业内部小规模服务调用 |
Apache Pulsar | 基于Bookeeper构建,消息可靠性非常高。 | 周边生态还有差距,目前使用的公司比较少。 | 企业内部大规模服务调用 |
Apache RocketMQ | 高吞吐、高性能、高可用。功能全面。客户端协议丰富。使用java语言开发,方便定制。 | 服务加载比较慢。 | 几乎全场景,特别适合金融场景 |
环境搭建
RocketMQ的分布式集群基于主从架构搭建。在多个服务器组成的集群中,指定一部分节点作为Master节点,负责响应客户端的请求。指令另一部分节点作为Slave节点,负责备份Master节点上的数据,这样,当Master节点出现故障时,在Slave节点上可以保留有数据备份,至少保证数据不会丢失。
分布式集群配置
- brokerClusterName: 集群名。RocketMQ会将同一个局域网下所有brokerClusterName相同的服务自动组成一个集群,这个集群可以作为一个整体对外提供服务
- brokerName: Broker服务名。同一个RocketMQ集群当中,brokerName相同的多个服务会有一套相同的数据副本。同一个RocketMQ集群中,是可以将消息分散存储到多个不同的brokerName服务上的。
- brokerId: RocketMQ中对每个服务的唯一标识。RocketMQ对brokerId定义了一套简单的规则,master节点需要固定配置为0,负责响应客户端的请求。slave节点配置成其他任意数字,负责备份master上的消息。
- brokerRole: 服务的角色。这个属性有三个可选项:ASYNC_MASTER,SYNC_MASTER和SLAVE。其中,ASYNC_MASTER和SYNC_MASTER表示当前节点是master节点,目前暂时不用关心他们的区别。SLAVE则表示从节点。
- namesrvAddr: nameserver服务的地址。nameserver服务默认占用9876端口。多个nameserver地址用;隔开。
升级高可用集群
在Dledger集群中,就不再单独指定各个broker的服务,而是由这些broker服务自行进行选举,产生一个Leader角色的服务,响应客户端的各种请求。而其他的broker服务,就作为Follower角色,负责对Leader上的数据进行备份。当然,Follower所要负责的事情,比主从架构中的SLAVE角色会要复杂一点,因为这种节点选举是在后端不断进行的,他们需要随时做好升级成Leader的准备。
Dledger集群的选举是通过Raft协议进行的,Raft协议是一种多数同意机制。也就是每次选举需要有集群中超过半数的节点确认,才能形成整个集群的共同决定。同时,这也意味着在Dledger集群中,只要有超过半数的节点能够正常工作,那么整个集群就能正常工作。因此,在部署Dledger集群时,通常都是部署奇数台服务,这样可以让集群的容错性达到最大。
Dledger集群机制是RocketMQ自4.5版本开始支持的一个重要特性。他其实是由OpenMessage组织带入RocketMQ的一个系列框架。他是一个为高可用、高性能、高可靠的分布式存储系统提供基础支持的组件。他做的事情主要有两个,一是在集群中选举产生master节点。RocketMQ集群需要用这个master节点响应客户端的各种请求。二是在各种复杂的分布式场景下,保证CommitLog日志文件在集群中的强一致性。
其背后的核心就是Raft协议。这是一种强大的分布式选举算法,其核心是只要集群中超过半数的节点作出的共同决议,就认为是集群最终的共同决议。
Dledger集群如何防止集群脑裂问题?
DLedger集群通过使用Raft协议来防止集群脑裂(split-brain)问题。脑裂问题是指在分布式系统中,由于网络分区或其他原因导致集群被分割成两个或多个子集群,各自独立运行且无法感知到其他子集群的存在。这可能导致数据不一致和错误决策。Raft协议采用了一系列措施来避免脑裂问题的发生:
- 选举机制:Raft协议的基础是选举出一个领导者(Leader),其他节点(Follower)都从领导者获取数据。选举过程要求候选人必须获得集群中大多数节点的支持才能成为领导者。这确保了集群中只能有一个领导者,从而避免了脑裂问题。
- 任期(Term):Raft协议为每个选举周期设置了一个递增的任期编号。任期编号用于标识当前的领导者,确保旧的领导者不会再次被选为领导者。如果一个节点发现自己的任期小于其他节点,那么它会停止当前的工作并更新自己的任期。
- 心跳机制:领导者会定期向其他节点发送心跳消息,以保持与Follower节点的连接。当一个节点长时间未收到领导者的心跳时,它会认为当前领导者失效,并启动新一轮选举。这确保了当领导者出现故障时,系统能够快速地选出新的领导者。
- 日志复制:领导者负责将数据更新(日志条目)复制到其他节点。Follower节点只有在收到领导者的日志条目并将其写入本地日志后,才会响应客户端的请求。这确保了在发生脑裂情况下,不会出现多个节点试图同时修改同一份数据的情况。
通过以上措施,DLedger集群利用Raft协议避免了脑裂问题的发生,保证了系统的高可用性和数据一致性。
完整梳理一下RocketMQ中各个组件的作用:
1、nameServer 命名服务
在我们之前的实验过程中,你会发现,nameServer不依赖于任何其他的服务,自己独立就能启动。并且,不管是broker还是客户端,都需要明确指定nameServer的服务地址。以一台电脑为例,nameServer可以理解为是整个RocketMQ的CPU,整个RocketMQ集群都要在CPU的协调下才能正常工作。
2、broker 核心服务
从之前的实验过程中你会发现,broker是RocketMQ中最为娇贵的一个组件。RockeMQ提供了各种各样的重要设计来保护broker的安全。同时broker也是RocketMQ中配置最为繁琐的部分。同样以电脑为例,broker就是整个RocketMQ中的硬盘、显卡这一类的核心硬件。RocketMQ最核心的消息存储、传递、查询等功能都要由broker提供。
3、client 客户端
Client包括消息生产者和消息消费者。同样以电脑为例,Client可以认为是RocketMQ中的键盘、鼠标、显示器这类的输入输出设备。鼠标、键盘输入的数据需要传输到硬盘、显卡等硬件才能进行处理。但是键盘、鼠标是不能直接将数据输入到硬盘、显卡的,这就需要CPU进行协调。通过CPU,鼠标、键盘就可以将输入的数据最终传输到核心的硬件设备中。经过硬件设备处理完成后,再通过CPU协调,显示器这样的输出设备就能最终从核心硬件设备中获取到输出的数据。
RocketMQ的消息模型
在RocketMQ的这个消息模型当中,最为核心的就是Topic。对于客户端,Topic代表了一类有相同业务规则的消息。对于Broker,Topic则代表了系统中一系列存储消息的资源。所以,RocketMQ对于Topic是需要做严格管理的。如果任由客户端随意创建Topic,那么服务端的资源管理压力就会非常大。
而对于业务来说,最为重要的就是消息Message了。生产者发送到某一个Topic下的消息,最终会保存在Topic下的某一个MessageQueue中。而消费者来消费消息时,RocketMQ会在Broker端给每个消费者组记录一个消息的消费位点Offset。通过Offset控制每个消费者组的消息处理进度。这样,每一条消息,在一个消费者组当中只被处理一次。
RocketMQ的客户端编程模型相对比较固定,基本都有一个固定的步骤。掌握这个固定步骤,对于学习其他复杂的消息模型也是很有帮助的。
基本流程
- 消息生产者的固定步骤
1.创建消息生产者producer,并指定生产者组名
2.指定Nameserver地址
3.启动producer。 这个步骤比较容易忘记。可以认为这是消息生产者与服务端建立连接的过程。
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer,释放资源。
- 消息消费者的固定步骤
1.创建消费者Consumer,必须指定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer。消费者会一直挂起,持续处理消息。
消息确认机制
1、消息生产端采用消息确认加多次重试的机制保证消息正常发送到RocketMQ
针对消息发送的不确定性,封装了三种发送消息的方式。
第一种称为单向发送
单向发送方式下,消息生产者只管往Broker发送消息,而全然不关心Broker端有没有成功接收到消息。这就好比生产者向Broker发一封电子邮件,Broker有没有处理电子邮件,生产者并不知道。
第二种称为同步发送
同步发送方式下,消息生产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的相应结果。这就好比生产者给Broker打了个电话。通话期间生产者就停下手头的事情,直到Broker明确表示消息处理成功了,生产者才继续做其他的事情。
第三种称为异步发送
异步发送机制下,生产者在向Broker发送消息时,会同时注册一个回调函数。接下来生产者并不等待Broker的响应。当Broker端有响应数据过来时,自动触发回调函数进行对应的处理。这就好比生产者向Broker发电子邮件通知时,另外找了一个代理人专门等待Broker的响应。而生产者自己则发完消息后就去做其他的事情去了。
2、消息消费者端采用状态确认机制保证消费者一定能正常处理对应的消息
3、消费者也可以自行指定起始消费位点
Broker端通过Consumer返回的状态来推进所属消费者组对应的Offset
广播消息
广播模式和集群模式是RocketMQ的消费者端处理消息最基本的两种模式。集群模式下,一个消息,只会被一个消费者组中的多个消费者实例 共同 处理一次。广播模式下,一个消息,则会推送给所有消费者实例处理,不再关心消费者组。
顺序消息机制
基础思路:只有放到一起的一批消息,才有可能保持消息的顺
1、生产者只有将一批有顺序要求的消息,放到同一个MesasgeQueue上,Broker才有可能保持这一批消息的顺序。
2、消费者只有一次锁定一个MessageQueue,拿到MessageQueue上所有的消息,
延迟消息
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
虽然不太起眼,但是这是RocketMQ非常有特色的一个功能。对比下RabbitMQ和Kafka。RabbitMQ中只能通过使用死信队列变相实现延迟消息,或者加装一个插件来支持延迟消息。 Kafka则不太好实现延迟消息。
延迟消息的难点其实是性能,需要不断进行定时轮询。全部扫描所有消息是不可能的,RocketMQ的实现方式是预设一个系统Topic,名字叫做SCHEDULE_TOPIC_XXXX。在这个Topic下,预设18个延迟队列。然后每次只针对这18个队列里的消息进行延迟操作,这样就不用一直扫描所有的消息了。
18个延迟级别虽然无法调整,但是每个延迟级别对应的延迟时间其实是可以调整的。只需要修改截图中的参数就行。不过通常不建议这么做。
批量消息
生产者要发送的消息比较多时,可以将多条消息合并成一个批量消息,一次性发送出去。这样可以减少网络IO,提升消息发送的吞吐量。
过滤消息
同一个Topic下有多种不同的消息,消费者只希望关注某一类消息。
Tag属性的处理比较简单,就是直接匹配。而SQL语句的处理会比较麻烦一点。RocketMQ也是通过ANLTR引擎来解析SQL语句,然后再进行消息过滤的。
ANLTR是一个开源的SQL语句解析框架。很多开源产品都在使用ANLTR来解析SQL语句。比如ShardingSphere,Flink等。
事务消息
事务消息是RocketMQ非常有特色的一个高级功能。他的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据一致性。
具体的实现思路
- 生产者将消息发送至Apache RocketMQ服务端。
- Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
- 生产者开始执行本地事务逻辑。
- 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
- 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
- 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
- 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
- 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
- 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
ACL权限控制机制
RocketMQ提供了针对队列、用户等不同维度的非常全面的权限管理机制。通常来说,RocketMQ作为一个内部服务,是不需要进行权限控制的,但是,如果要通过RocketMQ进行跨部门甚至跨公司的合作,权限控制的重要性就显现出来了。
调优
死信队列的特征:
- 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
- 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
- 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
- 死信队列中的消息不会再被消费者正常消费。
- 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。
注:默认创建出来的死信队列,他里面的消息是无法读取的,在控制台和消费者中都无法读取。这是因为这些默认的死信队列,他们的权限perm被设置成了2:禁读(这个权限有三种 2:禁读,4:禁写,6:可读可写)。需要手动将死信队列的权限配置成6,才能被消费(可以通过mqadmin指定或者web控制台)。
消费者端进行幂等控制
在MQ系统中,对于消息幂等有三种实现语义:
- at most once 最多一次:每条消息最多只会被消费一次
- at least once 至少一次:每条消息至少会被消费一次
- exactly once 刚刚好一次:每条消息都只会确定的消费一次
这三种语义都有他适用的业务场景。
其中,at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。
而at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。
而这个exactly once是MQ中最理想也是最难保证的一种语义,需要有非常精细的设计才行。RocketMQ只能保证at least once,保证不了exactly once。所以,使用RocketMQ时,需要由业务系统自行保证消息的幂等性。
我们知道,在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。
而要处理这个问题,RocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。
但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。
核心
Producer有两种:
- 一种是普通发送者:DefaultMQProducer。只负责发送消息,发送完消息,就可以停止了。
- 另一种是事务消息发送者: TransactionMQProducer。支持事务消息机制。需要在事务消息过程中提供事务状态确认的服务,这就要求事务消息发送者虽然是一个客户端,但是也要完成整个事务消息的确认机制后才能退出。
客户端负载均衡
Consumer负载均衡
集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
每次分配时,都会将MessageQueue和消费者ID进行排序后,再用不同的分配算法进行分配。内置的分配的算法共有六种
-
AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。
-
AllocateMessageQueueAveragely:平均分配。将所有MessageQueue平均分给每一个消费者
-
AllocateMessageQueueAveragelyByCircle: 轮询分配。轮流的给一个消费者分配一个MessageQueue。
-
AllocateMessageQueueByConfig: 不分配,直接指定一个messageQueue列表。类似于广播模式,直接指定所有队列。
-
AllocateMessageQueueByMachineRoom:按逻辑机房的概念进行分配。又是对BrokerName和ConsumerIdc有定制化的配置。
-
AllocateMessageQueueConsistentHash。源码中有测试代码AllocateMessageQueueConsitentHashTest。这个一致性哈希策略只需要指定一个虚拟节点数,是用的一个哈希环的算法,虚拟节点是为了让Hash数据在换上分布更为均匀。
广播模式
广播模式实现的关键是将消费者的消费偏移量不再保存到broker当中,而是保存到客户端当中,由客户端自行维护自己的消费偏移量。
消息持久化设计
存储文件主要分为三个部分:
- CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
- ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
- IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程
另外,还有几个辅助的存储文件,主要记录一些描述消息的元数据:
- checkpoint:数据存盘检查点。里面主要记录commitlog文件、ConsumeQueue文件以及IndexFile文件最后一次刷盘的时间戳。
- config/*.json:这些文件是将RocketMQ的一些关键配置信息进行存盘保存。例如Topic配置、消费者组配置、消费者组消息偏移量Offset 等等一些信息。
- abort:这个文件是RocketMQ用来判断程序是否正常关闭的一个标识文件。正常情况下,会在启动时创建,而关闭服务时删除。但是如果遇到一些服务器宕机,或者kill -9这样一些非正常关闭服务的情况,这个abort文件就不会删除,因此RocketMQ就可以判断上一次服务是非正常关闭的,后续就会做一些数据恢复的操作。
简单来说,Producer发过来的所有消息,不管是属于那个Topic,Broker都统一存在CommitLog文件当中,然后分别构建ConsumeQueue文件和IndexFile两个索引文件,用来辅助消费者进行消息检索。
同步刷盘采用的是GroupCommitService子线程。虽然是叫做同步刷盘,但是从源码中能看到,他实际上并不是来一条消息就刷一次盘。而是这个子线程每10毫秒执行一次doCommit方法,扫描文件的缓存。只要缓存当中有消息,就执行一次Flush操作。
而异步刷盘采用的是FlushRealTimeService子线程。这个子线程最终也是执行Flush操作,只不过他的执行时机会根据配置进行灵活调整。所以可以看到,这里异步刷盘和同步刷盘的最本质区别,实际上是进行Flush操作的频率不同。
我们经常说使用RocketMQ的同步刷盘,可以保证Broker断电时,消息不会丢失。但是可以看到,RocketMQ并不可能真正来一条消息就进行一次刷盘,这样在海量数据下,操作系统是承受不了的。而只要不是来一次消息刷一次盘,那么在Broker直接断电的情况接下,就总是会有内存中的消息没有刷入磁盘的情况,这就会造成消息丢失。所以,对于消息安全性的设计,其实是重在取舍,无法做到绝对。
文件结构: 每个ConsumeQueue文件固定由30万个固定大小20byte的数据块组成,数据块的内容包括:msgPhyOffset(8byte,消息在文件中的起始位置)+msgSize(4byte,消息在文件中占用的长度)+msgTagCode(8byte,消息的tag的Hash值)。
msgTag是和消息索引放在一起的,所以,消费者根据Tag过滤消息的性能是非常高的。
1、CommitLog文件的大小是固定的,但是其中存储的每个消息单元长度是不固定的,具体格式可以参考org.apache.rocketmq.store.CommitLog中计算消息长度的方法
2、ConsumeQueue文件主要是加速消费者的消息索引。他的每个文件夹对应RocketMQ中的一个MessageQueue,文件夹下的文件记录了每个MessageQueue中的消息在CommitLog文件当中的偏移量。这样,消费者通过ComsumeQueue文件,就可以快速找到CommitLog文件中感兴趣的消息记录。而消费者在ConsumeQueue文件当中的消费进度,会保存在config/consumerOffset.json文件当中。
3、IndexFile文件主要是辅助消息检索。他的作用主要是用来支持根据key和timestamp检索消息。他的文件名比较特殊,不是以消息偏移量命名,而是用的时间命名。但是其实,他也是一个固定大小的文件。
长轮询机制简单来说,就是当Broker接收到Consumer的Pull请求时,判断如果没有对应的消息,不用直接给Consumer响应(给响应也是个空的,没意义),而是就将这个Pull请求给缓存起来。当Producer发送消息过来时,增加一个步骤去检查是否有对应的已缓存的Pull请求,如果有,就及时将请求从缓存中拉取出来,并将消息通知给Consumer。
操作系统对于内存空间,是分为用户态和内核态的。
mmap的映射机制由于还是需要用户态保存文件的映射信息,数据复制的过程也需要用户态的参与,这其中的变数还是非常多的。所以,mmap机制适合操作小文件,如果文件太大,映射信息也会过大,容易造成很多问题。RocketMQ做大的CommitLog文件保持在1G固定大小,也是为了方便文件映射。
sendfile机制在内核态直接完成了数据的复制,不需要用户态的参与,所以这种机制的传输效率是非常稳定的。sendfile机制非常适合大数据的复制转移。
Dleger集群的文件同步机制
在使用Dledger技术搭建的RocketMQ集群中,Dledger会通过两阶段提交的方式保证文件在主从之间成功同步。
Dledger是由开源组织OpenMessage带入到RocketMQ中的一种高可用集群方案。Dledger的主要作用有两个,一是进行Broker自动选主。二是接管Broker的CommitLog文件写入过程。将单机的文件写入,转为基于多数同意机制的分布式消息写入。
简单来说,数据同步会通过两个阶段,一个是uncommitted阶段,一个是commited阶段。
Leader Broker上的Dledger收到一条数据后,会标记为uncommitted状态,然后他通过自己的DledgerServer组件把这个uncommitted数据发给Follower Broker的DledgerServer组件。
接着Follower Broker的DledgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的Dledger。然后如果Leader Broker收到超过半数的Follower Broker返回的ack之后,就会把消息标记为committed状态。
再接下来, Leader Broker上的DledgerServer就会发送committed消息给Follower Broker上的DledgerServer,让他们把消息也标记为committed状态。这样,就基于Raft协议完成了两阶段的数据同步。
原文地址:https://blog.csdn.net/weixin_73195042/article/details/144805409
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!