制作一个rabbitmq-sdk
目录结构
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.aasee</groupId>
<artifactId>sms-modules</artifactId>
<version>3.6.3</version>
</parent>
<artifactId>sms-rabbitmq-starter</artifactId>
<name>sms-rabbitmq-starter</name>
<description>
rabbitmq-sdk
</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- SpringBoot Boot rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- skywalking -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>9.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-trace</artifactId>
<version>9.3.0</version>
</dependency>
</dependencies>
</project>
com.aasee.rabbitmq.configure.Callback
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@Slf4j
public class Callback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
/**
*
* @param correlationData correlation data for the callback. 相关配置信息
* @param ack true for ack, false for nack exchange交换机 是否成功收到了消息。true代表成功,false代表失败
* @param cause An optional cause, for nack, when available, otherwise null. 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm方法被执行了!!!!! correlationData:{}",correlationData);
if (ack){
// 接受成功
log.info("接受消息成功! correlationDataId: {} ,cause: {} " ,correlationData.getId(), cause);
}else {
// 接受失败
log.error("接受消息失败! correlationDataId: {} ,cause: {} " ,correlationData.getId(), cause);
}
}
/**
* 回退模式:当消息发送给Exchange后, Exchange路由到Queue失败时 才会执行 ReturnCallBack
* 步骤:
* 1. 开启回退模式: publisher-returns: true #是否开启发送端消息抵达队列的确认
* 2. 设置ReturnCallBack
* 3. 设置Exchange处理消息的模式:
* 1. 如果消息没有路由到Queue,则丢弃信息(默认)
* 2. 如果消息没有路由到Queue,返回给消息发送方ReturnCallBack 设置mandatory为true
*
*/
/**
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("return机制方法被执行了。。。。。");
log.info("消息(message):" + returned.getMessage());
log.info("退回原因代码(replyCode):" + returned.getReplyCode());
log.info("退回原因(replyText):" + returned.getReplyText());
log.info("交换机(exchange):" + returned.getExchange());
log.info("路由Key(routingKey):" + returned.getRoutingKey());
// TODO 处理未到Queue的数据(或者使用备份交换机)
}
}
com.aasee.rabbitmq.configure.CustomMessageInterceptor
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.stereotype.Component;
/**
* 自定义消息拦截器
*
* @author Aasee
* @date 2024-03-11
*/
@Component
public class CustomMessageInterceptor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) {
// 获取原始消息属性
MessageProperties properties = message.getMessageProperties();
// 设置新的消息头部信息(如果有需求)
// properties.setHeader("loginUser", SecurityUtils.getLoginUser());
return new Message(message.getBody(), properties);
}
}
com.aasee.rabbitmq.configure.RabbitMQConfig
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置
*
* @author Aasee
* @date 2024-02-24
*/
@Configuration
public class RabbitMQConfig {
/**
* rabbitmq Redis 防止重复消费键
*/
public final static String RABBITMQ_REDIS_KEY="RabbitMq_Send_Sms:";
public final static String RABBITMQ_REISSUE_KEY="RabbitMq_Reissue_Sms:";
public final static String RABBITMQ_CALLBACK_KEY="RabbitMq_Callback_Sms:";
//--------------交换机名称-----------------------------------
public static final String EXCHANGE_NAME = "aasee_topic_exchange";
//---------------队列名称------------------------------------
public static final String QUEUE_NAME = "aasee_queue";
public static final String SMS_QUEUE_NAME = "cloud_sms_queue";
public static final String REISSUE_QUEUE_NAME = "sms_reissue_queue";
public static final String CALLBACK_QUEUE_NAME = "callback_sms_queue";
//--------------路由键名称-----------------------------------
public static final String ROUTING_KEY = "aasee.#";
public static final String SMS_ROUTING_KEY = "sms.#";
public static final String REISSUE_ROUTING_KEY = "reissue.#";
public static final String CALLBACK_ROUTING_KEY = "callback.#";
//------------------------交换机--------------------------
// 交换机
@Bean(value = "aaseeExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//------------------------队列--------------------------
// 队列
@Bean("aaseeQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
// SMS 队列
@Bean("cloudSmsQueue")
public Queue smsQueue(){
return QueueBuilder.durable(SMS_QUEUE_NAME).build();
}
// SMS 补发 队列
@Bean("smsReissueQueue")
public Queue smsReissueQueue(){
return QueueBuilder.durable(REISSUE_QUEUE_NAME).build();
}
// 短信回调 队列
@Bean("callbackSmsQueue")
public Queue callbackSmsQueue(){
return QueueBuilder.durable(CALLBACK_QUEUE_NAME).build();
}
//-------------------路由绑定-------------------------
// 队列和交换机绑定关系 Binding
/*
1. 知道哪个队列
2. 知道哪个交换机
3. routing key
*/
// 路由绑定
@Bean
public Binding bingQueueExchange(@Qualifier("aaseeQueue") Queue queue, @Qualifier("aaseeExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
}
// SMS 路由绑定
@Bean
public Binding bingSmsQueueExchange(@Qualifier("cloudSmsQueue") Queue queue, @Qualifier("aaseeExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(SMS_ROUTING_KEY).noargs();
}
// 补发 路由绑定
@Bean
public Binding bingSmsReissueQueueExchange(@Qualifier("smsReissueQueue") Queue queue, @Qualifier("aaseeExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(REISSUE_ROUTING_KEY).noargs();
}
// 短信回调 路由绑定
@Bean
public Binding bingCallbackSmsQueueExchange(@Qualifier("callbackSmsQueue") Queue queue, @Qualifier("aaseeExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(CALLBACK_ROUTING_KEY).noargs();
}
// Mq模板类
@Bean
//设置rabbitTemplate的scope为:prototype
// @Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setBeforePublishPostProcessors(new CustomMessageInterceptor());
//成功回调
template.setConfirmCallback(new Callback());
// 开启mandatory模式(开启失败回调)
template.setMandatory(true);
//失败回调
template.setReturnsCallback(new Callback());
return template;
}
}
com.aasee.rabbitmq.service.RabbitMqService
import com.alibaba.fastjson2.JSON;
import com.aasee.rabbitmq.configure.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* Rabbit MQ 工具类
*
* @author Aasee
* @date 2024-03-11
*/
//@SuppressWarnings(value = {"unchecked", "rawtypes"})
@Component
@Slf4j
public class RabbitMqService {
@Autowired
public RabbitTemplate rabbitTemplate;
/**
* 发送 MQ 消息
*
* @param exchange 交换机
* @param routingKey 路由键
* @param value 放入mq中的消息体(需要对象)
* @param messagePostProcessor 消息后处理器(自定义处理)
* @param correlationData 相关数据(用于传递唯一标识,跟踪绑定数据信息)
*/
public <T> void sendMqMessage(String exchange, String routingKey,
T value, MessagePostProcessor messagePostProcessor,CorrelationData correlationData) {
// 推送到Mq中
rabbitTemplate.convertAndSend(exchange, routingKey, JSON.toJSONString(value), messagePostProcessor,correlationData);
}
/**
* 发送 MQ 消息
*
* @param exchange 交换机
* @param routingKey 路由键
* @param value 放入mq中的消息体(需要对象)
* @param messagePostProcessor 消息后处理器(自定义处理)
*/
public <T> void sendMqMessage(String exchange, String routingKey,
T value, MessagePostProcessor messagePostProcessor) {
// 唯一标识用于判断消息身份和内容
String messageId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(messageId);
// 推送到Mq中
String jsonString = JSON.toJSONString(value);
log.info("发送 MQ 消息! exchange: {} , routingKey: {} , messageId: {} , jsonString: {}",exchange,routingKey,messageId,jsonString);
rabbitTemplate.convertAndSend(exchange, routingKey, jsonString, messagePostProcessor,correlationData);
}
/**
* 发送短信信息
*
* @param value 短信信息
* @param messagePostProcessor 消息后处理器(自定义处理)
*/
public <T> void sendSmsMessage(T value, MessagePostProcessor messagePostProcessor) {
// 推送到Mq中
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.SMS_ROUTING_KEY, JSON.toJSONString(value), messagePostProcessor);
}
}
spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.aasee.rabbitmq.configure.RabbitMQConfig, \
com.aasee.rabbitmq.service.RabbitMqService, \
com.aasee.rabbitmq.configure.CustomMessageInterceptor
以上就是sdk的所有配置内容,使用bean的自动装配原理让这个sdk被引入时可以自动被spring托管配置。
接下来就是展示如何使用sdk的,同时展示nacos配置或者说application.yml配置
引入SDK
<dependency>
<groupId>com.aasee</groupId>
<artifactId>sms-rabbitmq-starter</artifactId>
<version>3.6.3</version>
</dependency>
可以把这个sdk放到阿里云制品仓库,或者自建私服,又或是直接托管到maven中央仓库,这样你的小伙伴们就能直接引入你的sdk
application.yml
# rabbitmq 配置
rabbitmq:
host: xx.xx.xxx.xxx
virtual-host: /cloudsmsTest
username: root
password: xxxxxx
port: 5672
publisher-confirm-type: correlated
publisher-returns: true #是否开启发送端消息抵达队列的确认
template:
mandatory: true # 只要消息没有正确抵达队列,以异步方式优先执行我们自己设置的回调,设置交换机处理失败消息的模式
listener:
simple:
acknowledge-mode: manual
prefetch: 1 #更改为每次读取1条消息,在消费者未回执确认之前,不在进行下一条消息的投送
定时上下线
在开发中我发现了一个有趣的需求,定时上线消费,定时下线停止消费,生产者可以持续往队列里发送消息,但是消费者则可以在指定时间,或者通过手动的方式上下线,以下是具体实现方法
com.aasee.smsconsumer.scheduler.ConsumerScheduler
import com.aasee.common.core.constant.ChannelConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class ConsumerScheduler implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private RabbitListenerEndpointRegistry registry;
// 每天早上8点启动消费者
// todo 创建接口用于手动启动
@Scheduled(cron = "0 0 8 * * ?")
public void startListening() {
// RabbitListenerEndpointRegistry registry = applicationContext.getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, RabbitListenerEndpointRegistry.class);
startConsumer();
}
public void startConsumer() {
if (!registry.getListenerContainer(ChannelConstant.SMSCONSUMER).isRunning()) {
registry.getListenerContainer(ChannelConstant.SMSCONSUMER).start();
log.info("smsConsumer 开启监听");
}
if (!registry.getListenerContainer(ChannelConstant.REISSUECONSUMER).isRunning()) {
registry.getListenerContainer(ChannelConstant.REISSUECONSUMER).start();
log.info("reissueConsumer 开启监听");
}
}
// 每天晚上7点停止消费者
@Scheduled(cron = "0 0 19 * * ?")
public void stopListening() {
// RabbitListenerEndpointRegistry registry = applicationContext.getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, RabbitListenerEndpointRegistry.class);
if (registry.getListenerContainer(ChannelConstant.SMSCONSUMER).isRunning()) {
registry.getListenerContainer(ChannelConstant.SMSCONSUMER).stop();
log.info("smsConsumer 停止监听");
}
if (registry.getListenerContainer(ChannelConstant.REISSUECONSUMER).isRunning()) {
registry.getListenerContainer(ChannelConstant.REISSUECONSUMER).stop();
log.info("reissueConsumer 停止监听");
}
}
// 这个是为了每次启动或者重启服务后马上开始消费,可以取消
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
log.info("服务启动开启消费!");
startConsumer();
}
}
注意
如何你们存在这种定时上下线的需求,需要在**@RabbitListener**注解上加多一个参数 autoStartup = “false” ,这样消费者就不会自动消费消息了
@RabbitListener(id = ChannelConstant.REISSUECONSUMER,queues = RabbitMQConfig.REISSUE_QUEUE_NAME,autoStartup = "false")
原文地址:https://blog.csdn.net/weixin_45801664/article/details/142353479
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!