SpringCloud Alibaba五大组件之——RocketMQ
SpringCloud Alibaba五大组件之——RocketMQ(文末附有完整项目GitHub链接)
前言
前文,我们已经介绍了SpringCloud Alibaba五大组件中的三个:dubbo、nacos、sentinel,文章连接:
1.太细了有手就行,SpringCloud Alibaba+Nacos+Dubbo整合
2.SpringCloud Alibaba五大组件之——Sentinel
有需要的可以去查阅。
ps:本文用到的项目demo也是基于以上两篇文章去扩展的,包括模块结构和版本依赖等等,只是Git分支不一样。
1.本文默认读者已经知晓了队列的基本常识,包括topic(主题),队列,生产者,消费者等等。
2.本文会用最常见的两种队列模型:广播队列和延迟队列来举例
3.本文用的rocketmq版本为:5.3.0,SpringCloud alibaba版本用的是2021.0.6.0毕业版本,其他相关的都是配套使用,具体可以查看前面两篇文章
一、安装RocketMQ(安装好了的直接跳过本节)
(1)用docker安装RocketMQ以及console
1.拉取MQ镜像
docker pull apache/rocketmq:5.3.0
2.拉取控制台镜像,这里有两个版本
docker pull styletang/rocketmq-console-ng
//或者官方版本
docker pull apacherocketmq/rocketmq-dashboard:latest
3.找一个合适的安装位置,创建broker.conf文件
vim broker.conf
4.填充内容如下:
# 集群名称
brokerClusterName = DefaultCluster
# 节点名称
brokerName = broker-a
# broker id节点ID, 0 表示 master, 其他的正整数表示 slave,不能小于0
brokerId = 0
# 在每天的什么时间删除已经超过文件保留时间的 commit log,默认值04
deleteWhen = 04
# 以小时计算的文件保留时间 默认值72小时
fileReservedTime = 72
# Broker角色
brokerRole = ASYNC_MASTER
# 刷盘方式
flushDiskType = ASYNC_FLUSH
# Broker服务地址,内部使用填内网ip,如果是需要给外部使用填公网ip,自行更改
brokerIP1 = 172.16.72.133
5.修改读写权限
//如果当前目录有权限,-R可以不要,它的意思是递归修改文件夹及子文件所有权限
chmod 777 -R broker.conf
6.在当前目录下,创建docker-compose.yml文件,在这之前我没用compose,但是每次都三个,用docker run,docker start太麻烦了,用compose一键搞定方便快捷,docker-compose指令需要下载,参考我的这篇文章:linux安装docker-compose
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.3.0
container_name: rmqnamesrv
ports:
- 9876:9876
networks:
- rocketmq
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.3.0
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- ./broker.conf:/home/rocketmq/rocketmq-5.3.0/conf/broker.conf
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
depends_on:
- namesrv
networks:
- rocketmq
command: sh mqbroker
proxy:
image: apache/rocketmq:5.3.0
container_name: rmqproxy
networks:
- rocketmq
depends_on:
- broker
- namesrv
ports:
- 8080:8080
- 8081:8081
restart: on-failure
environment:
- NAMESRV_ADDR=rmqnamesrv:9876
command: sh mqproxy
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8090:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- namesrv
networks:
- rocketmq
networks:
rocketmq:
driver: bridge
启动后,浏览器访问:127.0.0.1:8090,就能登录网页版控制台了,127.0.0.1记得换成你的linux服务器ip,8090是因为我映射了。
(2)普通安装RocketMQ以及console
- 下载二进制包
下载链接:https://dist.apache.org/repos/dist/release/rocketmq/5.3.0/rocketmq-all-5.3.0-bin-release.zip - 启动namesrv
### 启动namesrv
$ nohup sh bin/mqnamesrv &
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
- 启动Broker+Proxy
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/proxy.log
The broker[broker-a,192.169.1.2:10911] boot success...
- 下载控制台源码,下载并解压,切换至源码目录 rocketmq-dashboard-master/
源码:https://github.com/apache/rocketmq-dashboard - 编译
$ mvn clean package -Dmaven.test.skip=true
- 运行
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
ps:这里需要提一点,启动server和broker的默认内存很大,两个都要改,如果你要调小,需要在bin下面的runserver.sh修改启动参数:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
把这些JAVA_OPT="${JAVA_OPT}开头的都改成256, 这个大小你随意,默认是4g,如果服务器内存小了会报错启动不起来
二、Spring Cloud Steam集成RocketMQ
1.Spring Cloud Steam是什么
不知道大家有没有一种感觉,现在的各种官方文档,说人话的越来越少了,给你整一大堆特别拗口和官方的专业术语,看都看不懂,看似很高端,实则没啥luan用。
我来用白话文给大家解释一下:Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,它就是用来整合各种消息队列的,比如常见的Kafka,RabbitMQ,RocketMQ等等,它的宗旨就是简化配置,让开发者专注业务开发,而不是队列本身的细节。
Spring Cloud Stream内部有两个重要的东西:Binder 和 Binding
- Binder :整合队列的,包含了队列的基本配置,它的层级是Kafka,RabbitMQ,RocketMQ这一级
- Binding:每个队列内部的配置,比如我的Binder 是RocketMQ,那么Binding就是的RocketMQ的一些配置。
大家先了解一下就好,后面我会对配置作专门详细的讲解
2.用Spring Cloud Steam实现广播队列
1. 增加依赖,这一个就够了,其他的不需要
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
2. 新增topic:broadcast,5.0版本之后,官方推荐用mqadmin去新增和修改topic
sh mqadmin updatetopic -n localhost:9876 -t broadcast -c DefaultCluster
如果你是用docker部署的mq,则要进入到broker内部
docker exec -it rmqbroker bash
sh mqadmin updatetopic -n localhost:9876 -t broadcast -c DefaultCluster
解释一下这个操作,标准模板为:
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=<message_type>
- -n:代表你的nameserver的地址 ,也就是前面小节中启动namesrv,如果你不指定,默认的ip端口就是localhost:9876
- -t:你需要创建或者修改的主题名字
- -c:集群名字,默认是DefaultCluster
- -a +message.type=:这个-a和message.type一定要连着一起用中间的+号也是,表示消息的类型,不设置的话默认是normal。
既然说到了消息的类型,顺带提一嘴消息有哪些类型:
-
Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。
-
FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
-
Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。
-
Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。
PS:本文用到的都是默认的normal类型,虽然没有创建特殊messageType的队列,但是我们通过代码和配置,同样能实现延时消息、顺序消息等功能。
3. 自定义一个消息体SimpleMsg
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
@AllArgsConstructor
@Data
public class SimpleMsg implements Serializable {
private String msg;
}
4. 新建controller,模拟消息生产者
5.x版本以上不需要在启动类添加@EnableBinding注解
import com.wq.beans.SimpleMsg;
import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
@RestController
@RequestMapping("/mqtest")
public class RocketMqTestController {
@Autowired
private StreamBridge streamBridge;
@RequestMapping("/test1")
public void testOne() {
String key = "KEY";
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, 1);
Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("broadcastMessage"), headers);
streamBridge.send("broadcastMessage-out-0", msg);
}
}
5. 配置文件application.yml,后续再讲解配置的具体含义和用法
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 172.16.72.133:9876
bindings:
broadcastMessage-out-0:
destination: broadcast
6. 消费者代码
在两个不同模块下(同模块也行,不过配置文件就揉在一起了)创建两个消费者,两个消费者代码基本相同,我就贴一个就好
消费者代码:在任意一个可以自动注入的类中写就行,这里测试我就放在了启动类中。
@Bean
public Consumer<Message<SimpleMsg>> broadcastMessage() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
};
}
7. 消费者配置文件
spring:
cloud:
stream:
function:
definition: broadcastMessage
rocketmq:
binder:
name-server: 172.16.72.133:9876
bindings:
broadcastMessage-in-0:
consumer:
messageModel: BROADCASTING
bindings:
broadcastMessage-in-0:
destination: broadcast
group: broadcast-consumer
8. 验证
浏览器请求我们前面定义的测试接口:http://localhost:8080/mqtest/test1
看看生产者控制台打印:说明消息发送成功了
Invoking function: streamBridge<org.springframework.messaging.Message<java.lang.Object>, org.springframework.messaging.Message<java.lang.Object>>with input type: org.springframework.messaging.Message<java.lang.Object>
看看消费者控制台打印:
2024-09-26 19:41:27.444 INFO 91392 --- [cast-consumer_1] com.wq.AccountApplication : ConsumeMessageThread_broadcast-consumer_1 Consumer1 Receive New Messages: broadcastMessage
3.用Spring Cloud Steam实现延迟队列
1. 生产者代码
要实现延迟队列,我们要做的非常简单,让我们回到上一节的第5点,创建controler的时候,我们往header添加参数:MessageConst.PROPERTY_DELAY_TIME_LEVEL,新建一个test2接口
@RequestMapping("/test2")
public void testTwo() {
String key = "KEY";
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, 1);
//延迟队列的参数等级
headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 delayMessage"), headers);
logger.info("发送消息:我是 delayMessage");
streamBridge.send("delayMessage-out-0", msg);
}
2. 通过这种方式设置的延时队列,是分等级的
本文设置level为2,就是延迟5秒,大家可以按照下标来选择
1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
有人问如果我要精确设置时间呢?请看后面第三大点的补充说明
3. 生产者配置文件新增:delayMessage-out-0
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 172.16.72.133:9876
bindings:
broadcastMessage-out-0:
destination: broadcast
delayMessage-out-0:
destination: delaymessage
4. 消费者代码
一模一样,只是方法名字变了,这个后面会解释
@Bean
public Consumer<Message<SimpleMsg>> delayMessage() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer Receive New Messages: " + msg.getPayload().getMsg());
};
}
5. 消费者配置文件新增:delayMessage-in-0
spring:
cloud:
stream:
function:
definition: broadcastMessage;delayMessage
rocketmq:
binder:
name-server: 172.16.72.133:9876
bindings:
broadcastMessage-in-0:
consumer:
messageModel: BROADCASTING
delayMessage-in-0:
consumer:
messageModel: BROADCASTING
bindings:
broadcastMessage-in-0:
destination: broadcast
group: broadcast-consumer
delayMessage-in-0:
destination: delaymessage
group: delay-consumer
6. 经过测试,生产者生成消息后5秒,消费者才拿到消息消费。
生产
2024-09-27 15:35:07.053 INFO 15200 --- [nio-8080-exec-1] c.wq.controller.RocketMqTestController : 发送消息:我是 delayMessage
Invoking function: streamBridge<org.springframework.messaging.Message<java.lang.Object>, org.springframework.messaging.Message<java.lang.Object>>with input type: org.springframework.messaging.Message<java.lang.Object>
消费
2024-09-27 15:35:12.087 INFO 36580 --- [elay-consumer_1] com.wq.AccountApplication : ConsumeMessageThread_delay-consumer_1 Consumer Receive New Messages: 我是 delayMessage
4.配置详解
我就拿以下这个配置为例来讲解:
spring.cloud.stream 这是所有配置前缀,就不多说了,我们主要说下stream 下面的一些定义
spring:
cloud:
stream:
function:
definition: broadcastMessage;delayMessage #方法声明
rocketmq:
#rocketmq配置
binder:
name-server: 172.16.72.133:9876 #server地址
bindings:
#对每一个topic的细化配置,注意命名
broadcastMessage-in-0:
#对每一个topic的consumer细化配置
consumer:
#消费者的消费模式为广播模式
messageModel: BROADCASTING
delayMessage-in-0:
consumer:
messageModel: BROADCASTING
#stream的全局配置
bindings:
#初次对主题进行一个设置
broadcastMessage-in-0:
#绑定的主题topic
destination: broadcast
group: broadcast-consumer
delayMessage-in-0:
destination: delaymessage
group: delay-consumer
1. spring.cloud.strean.function.definition
你可以理解为这个就是方法名,在我们代码中对应的就是消费者的方法名:broadcastMessage
@Bean
public Consumer<Message<SimpleMsg>> broadcastMessage() {
return msg -> {
log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg());
};
}
注意这个要和下面的bindings的key前缀相同,比如:broadcastMessage-in-0 和broadcastMessage-out-0
2. spring.cloud.strean.function.rocketmq
不知道大家还有印象没,在前面介绍strean的时候说过,strean有两个很重要的组成,一个是binder,一个是binding,为啥这里配置文件只有binding没有binder呢?不要慌我这就道来,因为strean是支持集成很多队列的,如果我们项目中遇到两种以上的队列,比如集成了rocketmq,又集成了kafka,那么这时候binder就起作用了,在这里声明这两个队列
但是我们大部分时候只需要集成一个队列,所以这里可以简写,直接省略binder,直接配置rocketmq:spring.cloud.strean.function.rocketmq,这里就可以理解为是binder下面中的一个队列:rocketmq。
那下面就来说一下rocketmq下的一些配置含义:
-
spring.cloud.strean.function.rocketmq.binder:这个类可以理解为rocketmq整体的配置,它可以配置整个rocketmq的一些公共并且最上层的一些信息,我们这里只需要用到name-server:RocketMQ NameServer 地址。binder下面可以总体设置producer和consumer的一些属性
-
spring.cloud.strean.function.rocketmq.bindings:这个是单独对某个主题的一些补充,是对binder的细化,它的优先级最高,建议大家结合第三点spring.cloud.strean.bindings一起来理解
在配置中我们是这样配置的,其中broadcastMessage-in-0是每个主题的细化,下面第三点会讲,这里可以单独设置每个主题中consumer的属性,这里的messageModel: BROADCASTING代表消费者以广播的形式消费该消息,如果不设置messageModel,那么只会轮询其中的一个消费者来消费消息bindings: broadcastMessage-in-0: consumer: messageModel: BROADCASTING delayMessage-in-0: consumer: messageModel: BROADCASTING
3. spring.cloud.strean.bindings
bindings:
broadcastMessage-in-0:
destination: broadcast
group: broadcast-consumer
delayMessage-in-0:
destination: delaymessage
group: delay-consumer
这是rocketmq声明绑定主题topic的地方。
stream会将方法拆分为两个,一个in一个out
- functionName + -in- + index
- functionName + -out- + index
格式讲解:functionName就是我们的方法名,in和out是新版stream默认的,对应着生产者和消费者,照着写就行了,至于后面的下标,是代表着一个消息的消费顺序,没有特殊要求,一个消息只会有一个对应的消费者,所以基本都是0
- broadcastMessage-in-0:这就是bindings的一个key,可以平行的设置多个
- destination:代表我们的topic,这里一定要和mqadmin设置的topic一致
- group:消费组,5.x以后,生产者没有分组的概念,但是消费者又分组的概念,分组之后,即使是同样的消费者配置,只要group不同,都可以拿到消息进行消费。
ps:以下四点命名一致性:
1.spring.cloud.strean.bindings中key
2.spring.cloud.strean.function.definition中的声明要和第一点的前缀相同
3.rocketmq中的bindings的key要和第一点完全一致
4.具体消费实现的方法名,也就是前面打上@Bean注解那个Consumer方法,要和第一点的前缀相同
4. 总结
这几点有点绕,大家可以结合配置中的注解,多读几次加强理解,我再给大家做个总结:
(1)spring.cloud.strean.function.definition是单独的,只需要和下面命名保持一致就行,
(2)spring.cloud.strean中的binder,用spring.cloud.strean.function.rocketmq代替了
(3)spring.cloud.strean中的bindings 是主题声明,并且这一层可以第一次对每一个topic做一些统一配置,它的优先级最低。
(4)spring.cloud.strean.function.rocketmq是rocketmq的全局配置,他的配置优先于(3)中的配置
(5)spring.cloud.strean.function.rocketmq.bindings :这是最细化的一层配置,优先级最高,是对每一个topic的补充配置
三、关于用Spring Cloud Stream和SpringBoot集成RocketMQ的一些补充说明
1.延迟队列的精确时间
在rocketmq 5.x版本之前,延时队列只能通过level来设置时间,如果你用的是cloud alibaba版本,并且版本支持不是5.x的,需要在Maven依赖中剔除rocketmq 的依赖,并且重新引入5.x版本的
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>5.3.0</version>
</dependency>
在代码中,重新定义引入message的类为:org.apache.rocketmq.common.message.Message,以前用的是:org.springframework.messaging.Message。从新定义message,并且通过setDelayTimeMs或者setDelayTimeSec等方法来精确设置延时时间。
Message message = new Message();
message.setDelayTimeMs(3500);
message.setDelayTimeSec(3500);
message.setDeliverTimeMs(3500);
2.选择问题
如果你是比较专业的团队,对rocketmq掌握的比较深,而且有定制化的需求,那么我推荐你用springboot版本集成rocketmq
如果你只是需要简单的使用rocketmq,没有那么多自定义属性需求,需要快速开发,专注于业务,忽略rocketmq的队列细节,我推荐你使用Spring Cloud Stream的方式集成rocketmq
四、结语
最后,博主也是初次接触rocketmq,可能有写东西不是特别清楚明白,后面也会持续学习,更新这篇文章,如果大家有什么见解欢迎指出,博主也会及时修改文章,这么大一篇文章都看完了,点个赞不过分吧。
本文完整项目代码GitHub地址
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git
原文地址:https://blog.csdn.net/StreamlineWq/article/details/142457703
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!