消息中间件——rabbitmq,kafka,rocketmq
目录
mq
message Queue,消息队列,分布式应用之间实现异步通信的一种方式
生产者,生产消息,消息的发起方,负责创建承载业务信息的消息
消息服务端,处理消息的单元,用来创建和保存消息队列,负责消息的存储和传递,是mq的核心部分。
消费者,消费信息,是根据消息承载的信息处理业务逻辑。
mq解决什么问题
流量削峰,解决流量过大,业务需要短时间响应的问题,流量过大但是服务器性能无法满足,导致大量请求被积压,出现客户端超时的场景。为了保证高可用,把请求发送给mq,mq再将请求发送给其他服务器,从而平稳的处理后续业务,起到对大流量缓冲的作用。
例如,订单系统最大QPS是1万,这个处理能力正常时段1秒钟返回结果。但是在流量高峰时期,比如促销秒杀,如果QPS到达2万,订单系统就处理不过来,只能在超过负载后不允许用户下单,如果使用消息队列做缓冲,就可以取消这个限制,把超出负载的订单分散到一段时间处理,这样虽然有些用户下单十几秒后才收到下单成功的提示,但是比不能下单要好。
应用解耦,把相关但是耦合度不高的系统联系起来,解决不同系统之间使用不同框架后不同编程语言产生的兼容性问题,提高整个系统的灵活性。
异步处理,应用于实时性不高的场景,例如用户登录发送的验证码,支付成功通知等。生产者只需要将协商好的消息发送给消息队列,之后由消费者处理,不需要等待消费者返回结果。
rabbitmq
工作原理
核心组件,
producter,生产者
consumer,消费者
broker,代理或中介
connection,连接,消息的发送者和接收者都必须同broker建立一个连接
channel,管道,amqp协议里引入管道的概念,相当于一个虚拟连接,我们可以使用已经连接好的tcp长连接,避免了每次发送和接收消息都要创建和释放tcp长连接,这样降低了资源的消耗。不同的channel是相互隔离的。
Queue 队列,用来存储消息,队列是生产者和消费者的纽带,生产者发送的消息会存储到队列里,而消费者从队列里消费消息。
exchange,交换机,消息的路由器,他不存储消息,根据规则分发消息。
binding,绑定,exchange和队列必须建立一个绑定关系,为每个队列指定一个特殊标识。exchange和队列是多对对的关系,一个交换器的消息可以路由给多个队列,一个队列可以接收多个交换机的消息。在绑定关系建立好之后,生产者发送消息到exchange,会携带一个特殊标识。当这个标识与绑定的标识匹配时,消息就会发送给一个或者多个符合规则的队列。
vhost,虚拟主机,解决不同业务系统之间的消息隔离,节约硬件成本,实现资源的隔离和权限控制。
消息路由
rabbitmq是一个基于amqp实现的分布式消息中间件,生产者把消息发送给broker上的exchange。exchange把收到的消息根据路由规则发送给绑定的队列,最后把消息投递给订阅了这个消息的消费者,完成消息的异步通信。
其中交换机定义了消息的路由规则,消息路由到哪个队列。
队列表示消息的载体,每个消息可以根据路由规则路由到一个或多个队列里。
在消息的路由机制里,核心组件是交换机。负责接收生产者的消息,然后把消息路由到消息队列,而消息的路由规则由exchangeType 和 binging决定。binging表示建立队列和交换机之间的绑定关系,每一个绑定关系存在一个bingingkey,通过这种方式在交换机里建立一个路由关系表。
生产者在发送消息的时候,需要声明一个routingkey路由键,交换机拿到路由键之后,与路由表里面的bingingkey进行匹配,匹配方式由exchangeType决定。
exchange的类型,1.direct,完整匹配,路由键和绑定键完全一致,相当于点对点发送
2.fanout,广播,把消息广播给绑定在交换器上的所有队列。
3.topic,正则匹配,根据路由键使用正则表达式匹配bingingkey,符合匹配规则的队列收到这个消息。
如何保证消息不丢失
消息的传递过程,
1.生产者发送消息到mq,为了保证mq服务器收到生产者发送的消息,使用消息确认机制,生产者需要接收一条确认消息已被接收的通知。否则,就重新发送消息。通过channel的confirmselect开启发送方确认模式。
handleACK方法,表示broker接收到了消息,handleNack,表示消息丢失。这里可以做重试机制,例如一条消息发送3次还没有成功,就认为发送失败了。如果这个消息非常重要不能丢失,例如订单消息,我们这类消息放到数据库进行持久化。给每条消息设置一个状态标识,假设初始状态是0,服务器接收之后,触发回调,这时把这条消息的状态设置1.表示已收到。
补偿机制,用一个定时任务轮询数据库里状态为0的消息,并重新发送。设置一个最大重试次数,防止一条消息不断重试,消耗内存资源降低数据库性能。
但是不断轮询数据库会增加数据库压力,每发送一条消息都有和数据库进行交互,更新消息的状态,优化方法,1.创建一个专用的数据库处理发送失败的消息。
2.采用Redis缓存更新消息状态,减少轮询数据库压力。
mq存储的信息不丢失,
开启mq的持久化机制,基于raid刷盘,2种模式
raid0,磁盘集成,将多块磁盘当做一块使用。这种模式可以提高磁盘存储量。
raid1,磁盘镜像,将磁盘分成2半,将同样的数据在2块区域里各存储一份。这样避免了在消息存储的过程里,因为某一块磁盘道坏了,导致消息丢失。
消费者保证消息不丢失,
1.在处理消息的过程里出现了异常,导致消息丢失
2.消费者因为网络中断或者网络抖动,没有接收到消息
3.消费者刚接收到消息,服务器宕机了,导致后续操作中断。
使用信息确认机制。只需要消费者在收到消息后手动确认。当mq没有收到消息确认的时候重发消息。
实现高可用
高可用在分布式系统里,如果出现某些节点不可用的情况,保证客户端能够连接其他节点,不影响业务执行。实现高可用用2种集群部署和负载均衡。
集群部署,集群节点有2种,磁盘节点,内存节点,在集群环境里至少需要一个磁盘节点,用来持久化元数据,避免内存节点都崩溃时,无法同步元数据
集群模式有2种,普通集群模式,不同节点之间只会相互同步元素数据,不同同步消息内容。好处是可以节省存储和同步数据的网络开销。如果需要保证队列的高可用,需要开启镜像队列模式。
镜像队列模式,消息内容在所有镜像节点同步,可用性更高。但系统性能下降,在节点过多的情况下同步数据的代价比较大。
集群搭建成功后,保证高可用,使用负载均衡组件来做路由。对于生产者和消费者只需要连接到组件的虚拟IP地址就可以了。
1.可以监控集群里节点的状态,如果某个节点发生异常或故障把他剔除。
2.为了提高可用性,部署多个服务,能够自主完成主从选举
3.master需要对外提供一个虚拟IP地址,客户端只需要连接到这个虚拟的IP地址就可以完成真实的IP地址路由。
kafka
一个能处理超千万亿消息吞吐量的实时消息处理平台。
能支持这么大吞吐量的原因
磁盘顺序读写,磁盘的盘片不停的旋转,磁头在磁盘表面画出一个圆形轨迹,叫做磁道,由内到外,因为半径不同,所以有许多磁道,用半径线,把磁道分割成扇区。如果需要读写,就需要找到数据对应的扇区,这个过程叫寻址。随机io,读写数据在磁盘上分散存储,寻址比较耗时。顺序io,读写数据在磁盘上集中,不需要重复寻址。
kafka的消息是不断追加到磁盘文件的末尾,是顺序写入,这样提高了吞吐量。
稀疏索引,插入一批消息才会产生一条索引记录。后续使用二分法查找,可以提高检索效率。
批量文件压缩,默认不会删除数据,把所有的消息变成一个批量文件,把相同key合并成一个value,这样对消息进行合理的批量压缩,减少网络io。
零拷贝机制,kafka文件传输使用Java nio库里transferto方法,这个方法使用Linux系统调用sendFile函数,sendfile函数,实现了零拷贝,不需要用户经过缓冲区,可以直接把数据拷贝到网卡。
把磁盘里的文件发送到远程服务器的过程
1.从磁盘里读取目标文件的内容,拷贝到内核空间的缓冲区。
2.cup把内核空间的数据拷贝到用户空间缓冲区。
3.在应用里调用Write方法,把用户空间缓冲区的数据拷贝到内核空间的socket缓冲区。
4.把socket缓冲区里的数据拷贝到网卡
5.网卡把数据传输到目标服务器上。
这个过程需要拷贝4次,其中2次拷贝是浪费的
1.从内核空间拷贝的用户空间
2.再从用户空间拷贝到内核空间
由于用户空间和内核空间切换带来cup的上下文切换,影响了CPU的性能。零拷贝把这2次拷贝省略掉,应用程序直接把磁盘里的数据从内核传输到socket缓冲区,不需要经过用户空间。
如何保证消息不丢失
kafka是一个用来实现异步消息的中间件
producer,需要保证消息能够到达broker实现消息存储。默认采用的是异步发送,要确保消息能够发送成功,有2个方法,
1.把异步消息改成同步消息,这样producer能够知道消息发送结果。
2.添加异步回调函数监听消息发送结果。如果发送失败可以重试。
producer 提供重试参数retries,因为网络或者broker故障,producer自动重试。
broker保证收到的消息不丢失,只需要把消息持久化到磁盘。kafka采用异步批量刷盘实现,按照一定的消息量和时间间隔来刷盘,刷盘动作由操作系统调用,如果在刷盘之前系统破溃了,就会导致数据丢失。
kafka没有同步刷盘机制,这里采用分区副本和acks机制解决。
分区副本机制,是针对每个数据分区的高可用策略。每个分区副本集都包含唯一的leader和多个follower,leader处理事务请求,follower处理leader同步数据。
acks参数,producer可以设置acks参数,结合broker副本机制来共同保证数据可靠性,1.acks=0,表示producer不需要等待broker响应,就认为消息发送成功。
2.acks=1,表示broker里的leader分区在收到消息后,不在等待其他follower分区同步完成,就给producer就返回确认,这种情况下,如果leader 分区挂了,导致消息丢失。3.acks=-1,表示broker里leader分区已收到消息,并且isr列表里的follower分区同步完成,再给生产者返回确认,这样可以保证消息的可靠性。
保证消费者收到消息,如果消费者没有消费完这个消息就提交了,还可以通过调整offset重新消费。
避免重复消费
kafka存储的消息都有一个offset,kafka的消费是通过offset值来维护当前已经消费的数据。每消费一批消息,就会更新offset值,避免重复消费,默认情况下,在消息消费完成后,kafka会自动提交offset,消费端的自动提交有5秒间隔,在消费者消费的过程里,应用程序关闭或者机器宕机,这样offset没有提交,导致重复消费问题。
分区再平衡机制,把多个分区均匀的分配给消费者,消费者从分区消费消息,如果消费者在5分钟里没有消费完这批消息,就会触发再平衡机制,导致offset提交失败。再平衡之后,消费者还是从之前没有提交的offset开始消费,导致重复消费问题。
解决方法,
1.提高消费者的性能避免触发再平衡,使用异步方式处理消息,缩短单个消息消费时间,调整消息处理的超时时间,减少一次性从broker里获取消息的数量。
2.针对消息生成MD5,保存到数据库里,在处理消息之前,先去查询下消息没有有被消费。
如何保证消息顺序消费
kafka使用分区机制存储消息,同一个topic,可以维护多个分区实现消息分片,生产者在生产消息的时候,会根据消息的key进行取模,决定把消息存储到哪个分区里,消息是按照先后顺序有序的存储到分区里
假设topic有3个分区,消息正好被路由到3个独立的分区里面,消费端有3个消费者通过再平衡机制分别被指派了对应的分区,因为消费者是独立的网络节点,所以消费的顺序与发送的顺序不同,出现了乱序消费问题。
解决方法,自定义消息分区路由算法,把指定的key发送给同一个分区,指定特定的消费者来消费某个分区数据,保证消费的顺序。
数据存储原理
kafka存储消息的队列叫做topic,是一个逻辑概念,可以理解为一组消息的集合。topic,生产者,消费者是多对对的关系,一个生产者可以发送消息到多个topic,一个消费者可以从多个topic获取消息。
为了实现横向扩展,kafka会把不同的数据存放在不同的broker上,同时为了降低单体服务器的访问压力,会把一个topic的数据分割成多个分区,在服务器上每一个分区都有一个物理目录,topic名字后面的数字编号代表分区。例如,创建一个mytopic主题,数据目录存储到了3台机器上。mytopic0存储在A节点,mytopic1存储在B节点,mytopic2存储在C节点
为了提高分区的可靠性,设计了副本机制,在创建topic时,通过设定replication-factor副本因子确定副本数量。副本因子必须小于副本数,为了保证不会有1个分区的2个副本存储在一个节点上。所有的副本有2种角色,leader对外提供读写服务,follower从leader异步拉取数据。
为了防止log不断追加导致文件过大,检索消息的效率降低,当一个分区超出一定的大小的时候,会被切割成多个segment,在磁盘上每个segment由一个。log文件,index文件,timeindex文件组成。
log文件用来存储具体的数据文件。
index文件用来存储消费者的offset的索引文件,
Timeindex文件用来存储时间戳的索引文件,
并以切割时记录的offset值作为文件名。
kafka设计2种索引文件,1.偏移量索引文件,记录offset和消息在log文件里的映射关系。
2.时间戳索引文件,记录时间戳和offset的关系。
为了提高检索效率,kafka采用的是稀疏索引,间隔一批消息才产生一条索引记录。
我们可以通过参数设置索引的稀疏程度,索引越密集检索的效率越快,但是占用磁盘空间越多,越稀疏的索引占用的磁盘空间越小,在插入和删除时所需要维护开销越小。kafka在检索数据时,采用的是二分查找法,效率高。
IRS
消息发送到broker上,以分区的形式存储在磁盘上,为了保证分区的可靠性,提供副本机制。在分区的副本里,有leader和follower2种分区,生产者的消息首先存储到leader分区,然后复制到follower分区,这样设计的好处是一旦leader分区的节点挂了,可以重新从剩余的分区里面选举出新的leader分区。这样消费者可以从新的leader分区获取未消费的数据。
在分区副本机制里有2个重要功能,副本数据同步和leader选举。这些功能涉及到网络通信,为了避免通信延迟带来的性能问题,尽可能保证新选出来的leader分区的数据时最新的,设计了ISR方法,全称是in sync replica,是一个列表,保存的是和leader分区节点数据最接近follower分区。如果这个follower分区里面的数据落后leader分区太多,从isr列表删除,那么在列表里面的节点,同步数据是最新的,所以后续的leader选举,只需要从ISR列表进行筛选就可以了。
IRS优点,1.尽可能保证了数据同步的效率,因为同步效率不高的节点已被删除。
2.避免数据丢失。ISR列表里的节点的数据是和leader最接近的。
leader选举
早期版本使用zookeeper完成选举,利用zookeeper的watch机制,所以kafka节点不允许重复写入,但允许设置临时节点。这样实现简单,但是存在一些问题,例如当分区和副本数量过多时,如果所有副本都直接参与选举,一旦出现节点增减,就会造成大量的watch事件触发,导致zookeeper负载过重。
新版本里的实现方法,不是所有的replica数据集都参与leader选举,而是由其中的一个broker统一指挥,他叫做controller。kafka先选出控制器。
所有的broker会尝试在zookeeper里创建临时节点/controller,谁先创建成功,谁就是控制器。如果控制器挂掉或者出现网络问题,zookeeper上的临时节点就会消失。其他broker通过watch监听到控制器下线后,然后重新选出控制器,这个控制器相当于选举委员会主席。
当一个节点成为控制器后,就会监听,管理broker,topic,partition的信息。
控制器确定后,就可以开始分区选举了,只有ISR保持心跳同步的副本才有资格参与选举,让ISR里第一个replica变成leader,例如,isr是1,3,5,优先让1成为leader。
rocketmq
为什么不使用zookeeper
rocketmq使用nameServer实现broker注册和发现,为了保证高可用,nameServer可以集群部署,broker在nameServer上注册自己,生产者和消费者使用nameServer发现broker,每个broker节点在启动的时候,根据配置遍历nameServer列表,与每个nameServer建立长连接,注册自己的信息,之后每隔30s发送心跳信息。每个nameServer每隔10S会检查一些各个broker的最近一次心跳时间,如果发现某个broker超过120s都没有发送心跳信息,就把这个broker从路由信息删除。
rocketmq是一个保持最终一致性的架构设计,他的架构决定了他只需要一个轻量级元数据服务器就够了,不需要像zookeeper这样强一致性解决方案。不依赖另一个中间件可以减少整体的维护成本。根据cap理论,zookeeper选择cp,而nameServer选择ap
分布式事务
半消息,是一种暂时不能投递给消费者的消息,消息生产者已经将消息成功发送到mq服务器,但是mq服务器没有收到生产者对这条消息的二次确认,这时这条消息被标记为暂时不能投递。
消息回查,由于网络阻塞或生产者应用重启等原因,导致某条事务消息的二次确认丢失,mq服务器通过扫描发现某条消息长期处于半消息状态时,就主动向消息的生产者询问该消息的最终状态,要么commit,要么rollback.
生产者向mq服务器发送半消息
mq服务器将消息持久化成功之后,向生产者发送ACK,确认消息已经发送成功,这个时候消息标记为半消息。
生产者开始执行本地事务
生产者根据本地事务执行结果向mq提交二次确认。如果mq收到commit状态,将消息标记为可投递,向消费者发送消息,如果mq服务器收到回滚,则删除半消息,消费者将不会收到该消息。
如果断网或生产者应用重启的情况下,提交的二次确认没有到达mq服务器,经过固定时候后,mq服务器将对该消息发起消息回查。
生产者收到消息回查后,检查对应消息的本地事务执行的结果
生产者根据检查得到的本地事务执行的最终结果再次提交二次确认,如果mq收到commit状态,将消息标记为可投递,向消费者发送消息,如果mq服务器收到回滚,则删除半消息,消费者将不会收到该消息。
原文地址:https://blog.csdn.net/z524635690/article/details/144649884
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!