【RabbitMQ】rabbitmq广播模式的使用
前言:
项目需要同步另一个系统的数据,对方系统采用MQ的发布/订阅模式方便我们同步数据,即当对方系统中的某条数据修改后,会向绑定他们交换机的每一个队列发布消息。消费者(即我们)监听到消息变动,进行信息消费同步至我们库中。
我们需要做的就是:
1、创建一个新队列绑定到对方系统的交换机
2、将监听到的消息进行合理解析,取出消息中的请求头:
请求头信息为:"R" ,则代表该生为入学操作;
请求头信息为:"X" ,则代表该生为休学操作;
请求头信息为:"T" ,则代表该生为退学操作;
3、接下来根据获取到的请求头内容,来对对方系统传来的数据进行对应操作。
上代码,看思路:
实现1:
/**
* @Author: 宁兴星
* @CreateTime: 2026-01-16 14:05
* @Description: TODO
*/
@Configuration
public class RabbitMqConfig extends AbstractRabbitMQConfig {
/**
* 创建广播模式交换机(扇形)
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EventConstant.STUDENT_EXCHANGE, true, false);
}
/**
* 创建被监听的队列
*/
@Bean
public Queue dealerInfoQueue() {
return new Queue(EventConstant.STUDENT_QUEUE, true, false, false);
}
/**
* 将队列绑定到扇形交换机上,实现广播模式消息接收
*
* @param dealerInfoQueue
* @param fanoutExchange
* @return
*/
@Bean
public Binding binding(Queue dealerInfoQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(dealerInfoQueue).to(fanoutExchange);
}
/**
* 配置消息监听容器工厂
*
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(10);
return factory;
}
}
实现2:
/**
* MQ监听学生数据变更
*
* @param message 消息体
* @param deliveryTag 消息标识
* @param channel 通道
* @throws IOException IO异常
*/
@RabbitListener(queues = EventConstant.STUDENT_QUEUE)
@Operation(summary = "MQ监听学生数据变更", description = "MQ监听学生数据变更")
public void handleMessage(Message message,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
Channel channel) throws IOException {
try {
log.info("同步学生数据,接收到MQ消息: {}", message);
// 解析学生信息
StudentInfo studentInfo = parseStudentInfo(message);
log.info("解析后的学生数据: {}", studentInfo);
// 获取操作类型并处理
String action = getActionFromHeaders(message);
processStudentAction(action, studentInfo);
// 确认消息处理完成
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("处理学生数据消息异常: ", e);
// 消息处理失败,重新入队
channel.basicNack(deliveryTag, false, true);
}
}
/**
* 解析消息中的学生信息
*/
private StudentInfo parseStudentInfo(Message message) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(message.getBody(), StudentInfo.class);
}
/**
* 从消息头获取action
*/
private String getActionFromHeaders(Message message) {
Map<String, Object> headers = message.getMessageProperties().getHeaders();
String action = headers.get("action").toString();
log.info("操作类型: {}", action);
return action;
}
实现3:
/**
* 根据不同action处理学生数据
*/
private void processStudentAction(String action, StudentInfo studentInfo) {
if (action == null) {
return;
}
switch (action) {
case EventConstant.LIGHT_UP:
// 编写对应录取方法,此处省略具体信息
handleLightUp(studentInfo);
break;
case EventConstant.OFFLINE:
// 编写对应休学方法,此处省略具体信息
handleOffline(studentInfo);
break;
case EventConstant.DELETE:
// 编写对应退学方法,此处省略具体信息
handleDelete(studentInfo);
break;
default:
log.warn("未知的操作类型: {}", action);
}
}
结束啦,如有错误,敬请雅正!
原文地址:https://blog.csdn.net/wjjjjxxxx/article/details/145186901
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!