自学内容网 自学内容网

RocketMQ

rocketmq-all-4.9.2-bin-release
rocketmq-dashboard-1.0.0
Linux安装JDK

一、理论

消息队列的主要作用:
1、异步:提高系统响应率。购物功能,订单服务数据库中货物-1,客户信息与金额放到消息队列,由支付服务去消费信息给客户扣钱
2、解耦:提高系统健壮性。购物功能,订单服务扣的是产品数据库的产品数量,支付服务扣的是客户余额,通过消息队列将购物功能的这两个操作解耦出来,可以提高某个服务的实例数量,提高整个系统的健壮性
3、流量削峰:解决高并发场景下响应过慢的问题。购物功能,加入用户买东西整个业务逻辑中,用户扣款的处理是最慢的,可以将用户信息与扣款金额放到消息队列里,由支付服务消费消息给用户扣款,这样就提高了购物功能的处理时间与响应时间。

概念:
Producer:生产者
Consumer:消费者
Broker:保存消息的通道
NameServer:broker的注册中心
Topic:主题(每个主题对应一类消息)
Queue:队列,用于存放消息,每个Topic可以有多个Queue
Message:消息,消息除配置所在主题与消息体内容外,还可以配置标签Tag与业务标识Key
ProducerGroup:生产者组
ConsumerGroup:消费者组,一个消费者组下的所有消费者只能消费同一类Topic下的消息;其中Topic下的所有队列会均匀分布给每一个消费者消费,所以消费者的数量要低于队列数量

订阅关系一致性:
同一个消费者组下的所有消费者,所订阅的Topic、Tag必须完全一致(可以同时订阅多个Topic,只要保证所有消费者订阅的一致就行)

消费模式:
Push模式:MQ将消息推给消费者端,实时性高,但会导致消费者端崩溃,Push模式也是基于Pull模式
Pull模式:消费者端主动从MQ拉取消息,实时性差,但避免了消费者端崩溃

发送消息类型:
同步消息:生产者发送完消息后,需要等待MQ服务返回成功的响应,消息才算发送成功。针对重要的消息
异步消息:生产者发送完消息后,不会等待MQ服务的响应,但会有回调方法去处理发送成功或者发送失败后的逻辑
单向消息:生产者发送完消息后,不关系发送结果,适合收集日志信息
延迟消息:生产者发送完消息后,该条信息回等待指定时间后才会被消费
批量消息:生产者一次性发送一组消息
顺序消息:生产者发送的某一批消息必须有序执行,要求该批消息必须在同一个队列中,并且消息方式不是异步,而是同步

原理:

二、实践

二.1 安装与启动

将rocketmq-all-4.9.2-bin-release上传至Linux服务器,

# 解压
unzip rocketmq-all-4.9.2-bin-release.zip
# /etc/profile 添加NAMESRV_ADDR地址
export NAMESRV_ADDR=localhost:9876
# 刷新 /etc/profile
source /etc/profile
# 配置JAVA_HOME(参考博客:Linux安装JDK)
# 修改NameServer启动脚本(参考下图)
vim runserver.sh 
# 修改Broker启动脚本(参考下图)
vim runbroker.sh
# 修改broker配置文件
# 添加内容:namesrvAddr=localhost:9876   autoCreateTopicEnable=true   brokerIP1=192.168.84.10
vim broker.conf
# rocketmq-4.9.2目录下创建logs目录,用于保存NameServer,Broker,可视化面板的执行日志
mkdir logs
# 启动NameServer
nohup sh bin/mqnamesrv > ./logs/namesrv.log &
# 查看该进程
jps -l
# 启动Broker
nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &
# 查看该进程
jps -l
# 启动可视化面板(访问地址:http://192.168.84.10:8888/)
nohup java -jar ./rocketmq-dashboard-1.0.0.jar --server.port=8888  --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /micozone/app/rocketmq-4.9.2/logs/dashboard.log &
# 查看该进程
jps -l

修改runerver.sh
修改runserver.sh
修改runbroker.sh
修改runbroker.sh

二.2 消息重复消费问题

问题描述:
负载均衡模式下,某一个消息在同一个消费组中被重复消费。
确保重复消费的场景下,依然能保证幂等性(同一个消息消费一次跟消费多次的效果一样)。
出现原因:
生产者发送消息后,在等待MQ服务的确认响应时,MQ服务突然宕机且该条消息成功保存到从服务器,由于生产者没有收到MQ服务的确认响应,该条消息就会重复发;
消费者消费消息时,在向MQ服务响应处理成功结果之前,消费者组突然扩容了一台消费者客户端,此时队列会被重复分配,由于第一个消费者未向MQ响应处理成功结果,导致扩容的消费者会再次消费到该消息。
解决思路:
生产者发送消息时携带唯一标识key,消费者消费消息时,通过向去重表中插入该key的结果来判断该条消息是否已被消费过,如果报唯一约束字段值重复说明消息已被消费,不走后续逻辑。

二.3 消息进入死信队列问题

问题描述:
MQ有默认的生产者重发次数与消费者重消次数,也可以通过代码配置,其中消费者消费失败后的重新消费次数默认是16次,每次的时间是递增的,跟延迟时间等级一致,当重新消费次数达到上限后,消息会进入死信队列(死信队列所在主题为:%DLQ% + 消费者组名),消息进入死信队列后依然要重新处理
解决思路:
方案一:单独创建一个消费死信队列的消费者组,去对死信队列中的消息进行消费(数据库留痕等)
方案二(推荐):消费者可以获取到该笔消息重新消费的次数,当次数达到某个临界值时,不再重新消费,对该消息进行额外处理


原文地址:https://blog.csdn.net/weixin_51140510/article/details/142662988

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!