接口 V2 完善:基于责任链模式、Canal 监听 Binlog 实现数据库、缓存的库存最终一致性
🎯 本文介绍了一种使用Canal监听MySQL Binlog实现数据库与缓存最终一致性的方案。文章首先讲解了如何修改Canal配置以适应订单表和时间段表的变化,然后详细描述了通过责任链模式优化消息处理逻辑的方法,确保能够灵活应对不同数据表的更新需求。最后,展示了如何利用RocketMQ消费Canal消息并通过责任链处理器同步更新缓存,从而保证数据的一致性。此方法有效提升了系统的可扩展性和维护效率。
🏠️ HelloDam/场快订(场馆预定 SaaS 平台)
前言
在上一篇文章中,使用令牌限流方式来实现时间段的预定。时间段令牌和时间段库存缓存是分离的,因此需要额外对库存缓存进行更新,如何实现数据库与缓存的数据一致性是一个常见问题。本文使用 Canal 监听 Binlog 实现数据库、缓存的库存最终一致性。为什么使用这种方案?不了解的读者可以先阅读文章:https://zhuanlan.zhihu.com/p/408515044
不了解 MySQL Binlog 开启和 Canal 安装与配置的朋友请先阅读小白手把手教程:https://hellodam.blog.csdn.net/article/details/144483823
修改Canal配置
修改instance.properties
的过滤规则为canal.instance.filter.regex=^(venue-reservation)\\.(time_period_order_([0-9]|1[0-9]|2[0-9]|3[0-1])|time_period_([0-9]|1[0-5]))$
。现在不仅需要考虑订单表time_period_order
,还要考虑时间段表time_period
,因为现在要保证时间段库存和空闲场号的数据库、缓存一致性
修改 Canal 消息消费逻辑
之前的实现如下
import cn.hutool.core.util.ObjectUtil;
import com.vrs.annotation.Idempotent;
import com.vrs.constant.OrderStatusConstant;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author dam
* @create 2024/9/20 21:30
*/
@Slf4j(topic = RocketMqConstant.CANAL_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC,
consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP,
messageModel = MessageModel.CLUSTERING
)
@RequiredArgsConstructor
public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> {
private final TimePeriodService timePeriodService;
/**
* 消费消息的方法
* 方法报错就会拒收消息
*
* @param CanalBinlogDTO 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数
*/
@Idempotent(
uniqueKeyPrefix = "canal_binlog_common:",
key = "#canalBinlogDTO.getId()+''",
scene = IdempotentSceneEnum.MQ,
keyTimeout = 3600L
)
@SneakyThrows
@Override
public void onMessage(CanalBinlogDTO canalBinlogDTO) {
if (canalBinlogDTO.getOld() == null) {
return;
}
Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);
Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);
if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) {
log.info("[消费者] 消费canal的消息,恢复时间段的库存和空闲场号,时间段ID:{}", alterDataMap.get("time_period_id"));
Long userId = Long.parseLong(alterDataMap.get("user_id").toString());
Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString());
Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());
Long courtIndex;
if (alterDataMap.containsKey("partition_index")) {
courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString());
} else {
courtIndex = Long.parseLong(alterDataMap.get("court_index").toString());
}
Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString());
Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString());
if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) {
// 恢复库存
timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder()
.userId(userId)
.courtIndex(courtIndex)
.timePeriodId(timePeriodId)
.partitionId(partitionId)
.build());
}
}
}
}
因为之前只需要处理订单表即可,现在还需要处理时间段表的更改。所以需要做两件事:
- 识别是哪个数据表更新了
- 根据所识别的表,执行相应的业务逻辑
识别修改哪个数据表
首先需要了解 Canal 发送的消息内容格式,从下图可以看到有数据库名,但没有表名。因此为了识别数据表,只能通过表的独有字段名来识别了
如何实现根据表执行相应操作
一种方式是,直接在onMessage
方法中,识别完数据表类型之后,调用相应的方法来处理。这种方式实现简单,但后续如果要处理新的表,需要修改代码,违反了开闭原则。
为了解决这个问题,本文使用责任链模式,即封装多个处理器到责任链上,每个处理器负责识别一个表,并进行相应的业务逻辑。后续使用时就依次调用链上的处理器,如果处理器发现是自己的职责,就执行逻辑,否则直接返回,调用下一个处理器。
责任链模式框架
【抽象处理器】
package com.vrs.chain_of_responsibility;
/**
* @Author dam
* @create 2024/12/11 19:18
*/
public interface AbstractChainHandler<T> {
/**
* 由实现类来实现具体的处理方法
*/
boolean handle(T param);
/**
* 名称,用来区分不同的责任链
*/
String name();
/**
* 处理器的排序
*/
int order();
}
handler
:由具体的处理器来实现,用来实现业务逻辑name
:用来标识责任链,返回相同名字的处理器被归到一个责任链中order
:用来给同一责任链的处理器排序
【责任链上下文】
该类用来管理不同的责任链。
- 当Spring启动时,执行
run
方法,通过获取AbstractChainHandler
的实现类来初始化所有责任链,即将处理器按照name
划分到不同的责任链中,后面可以通过容器chainContainer
来获取。最后对同一链上的处理器按照sort
升序排序。 - 当调用
handler
方法时,会根据name
获取责任链,然后依次调用链上的处理器来进行业务处理,若有任意处理器的handle
方法返回 true ,责任链就会中断。如果想要依次执行所有处理器,那所有处理器都返回 false 即可
package com.vrs.chain_of_responsibility;
import org.springframework.beans.BeansException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
/**
* @Author dam
* @create 2024/12/11 19:20
*/
@Component
public class ChainContext<T> implements ApplicationContextAware, CommandLineRunner {
/**
* 通过 Spring IOC 获取 Bean 实例
*/
private ApplicationContext applicationContext;
/**
* key:责任链名称
* value:责任链
*/
private final Map<String, List<AbstractChainHandler>> chainContainer = new HashMap<>();
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Override
public void run(String... args) {
// 从 Spring IOC 容器中获取接口对应的 Spring Bean 集合
Map<String, AbstractChainHandler> chainFilterMap = applicationContext.getBeansOfType(AbstractChainHandler.class);
chainFilterMap.forEach((beanName, bean) -> {
// 判断 name 是否已经存在抽象责任链容器中
// 如果已经存在直接向集合新增
// 如果不存在,创建对应的集合
List<AbstractChainHandler> abstractChainHandlers = chainContainer.getOrDefault(bean.name(), new ArrayList<>());
abstractChainHandlers.add(bean);
chainContainer.put(bean.name(), abstractChainHandlers);
});
chainContainer.forEach((mark, unsortedChainHandlers) -> {
// 对每个责任链的实现类根据order升序排序
Collections.sort(unsortedChainHandlers, ((o1, o2) -> {
return Integer.compare(o1.order(), o2.order());
}));
});
}
/**
* 责任链组件执行
*
* @param name 责任链组件标识
* @param requestParam 请求参数
*/
public void handler(String name, T requestParam) {
// 根据 name 从责任链容器中获取对应的责任链
List<AbstractChainHandler> abstractChainHandlers = chainContainer.get(name);
if (CollectionUtils.isEmpty(abstractChainHandlers)) {
throw new RuntimeException(name + "对应的责任链不存在");
}
// 遍历责任链处理器
for (AbstractChainHandler handler : abstractChainHandlers) {
if (handler.handle(requestParam)) {
// --if-- 如果处理器返回 true,表示已经处理完成,退出责任链
return;
}
}
}
}
常量
package com.vrs.constant;
/**
* Redis缓存Key常量类
*/
public class ChainConstant {
public static final String RESERVE_CHAIN_NAME = "reserve_chain";
public static final String CANAL_CHAIN_NAME = "canal_chain";
}
具体处理器
由于修改的要么是订单表,要么是时间段表,因此责任链上面只要有任一处理器成功处理,即返回 true ,就无须调用余下的其他处理器
【时间段库存修改处理器】
当数据库中的库存修改之后,同步修改缓存中的库存
package com.vrs.service.chainHander.canal;
import cn.hutool.core.util.ObjectUtil;
import com.vrs.chain_of_responsibility.AbstractChainHandler;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.RedisCacheConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.service.PartitionService;
import com.vrs.service.TimePeriodService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 订单超时关闭处理逻辑
*
* @Author dam
* @create 2024/12/11 19:43
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class TimePeriodStockChangeHandler implements AbstractChainHandler<CanalBinlogDTO> {
private final StringRedisTemplate redisTemplate;
private final TimePeriodService timePeriodService;
private final PartitionService partitionService;
@Override
public boolean handle(CanalBinlogDTO canalBinlogDTO) {
Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);
Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);
if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("stock")) {
// --if-- 如果是修改操作,且修改了stock
log.info("[消费者] 消费canal的消息,时间段库存修改,同步修改缓存的库存,时间段ID:{}", alterDataMap.get("id"));
Long timePeriodId = Long.parseLong(alterDataMap.get("id").toString());
Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());
Integer stock = Integer.parseInt(alterDataMap.get("stock").toString());
Long bookedSlots = Long.parseLong(alterDataMap.get("booked_slots").toString());
// 更新库存
redisTemplate.opsForValue().set(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_KEY, timePeriodId), stock.toString());
// 更新位图
timePeriodService.initializeFreeIndexBitmap(
String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_KEY, timePeriodId),
partitionService.getPartitionDOById(partitionId).getNum(),
bookedSlots,
24 * 3600);
return true;
}
return false;
}
@Override
public String name() {
return ChainConstant.CANAL_CHAIN_NAME;
}
@Override
public int order() {
return 10;
}
}
【订单超时关闭处理器】
package com.vrs.service.chainHander.canal;
import cn.hutool.core.util.ObjectUtil;
import com.vrs.chain_of_responsibility.AbstractChainHandler;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.OrderStatusConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.domain.dto.req.TimePeriodStockRestoreReqDTO;
import com.vrs.service.TimePeriodService;
import lombok.extern.slf4j.Slf4j;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 订单超时关闭处理逻辑
*
* @Author dam
* @create 2024/12/11 19:43
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderCloseHandler implements AbstractChainHandler<CanalBinlogDTO> {
private final TimePeriodService timePeriodService;
@Override
public boolean handle(CanalBinlogDTO canalBinlogDTO) {
Map<String, Object> alterDataMap = canalBinlogDTO.getData().get(0);
Map<String, Object> oldDataMap = canalBinlogDTO.getOld().get(0);
if (ObjectUtil.equal(canalBinlogDTO.getType(), "UPDATE") && oldDataMap.containsKey("order_status")) {
log.info("[消费者] 消费canal的消息,订单超时关闭,恢复时间段的库存和空闲场号,时间段ID:{}", alterDataMap.get("time_period_id"));
Long userId = Long.parseLong(alterDataMap.get("user_id").toString());
Long timePeriodId = Long.parseLong(alterDataMap.get("time_period_id").toString());
Long partitionId = Long.parseLong(alterDataMap.get("partition_id").toString());
Long courtIndex;
if (alterDataMap.containsKey("partition_index")) {
courtIndex = Long.parseLong(alterDataMap.get("partition_index").toString());
} else {
courtIndex = Long.parseLong(alterDataMap.get("court_index").toString());
}
Integer orderStatus = Integer.parseInt(alterDataMap.get("order_status").toString());
Integer oldOrderStatus = Integer.parseInt(oldDataMap.get("order_status").toString());
if (orderStatus.equals(OrderStatusConstant.CANCEL) && oldOrderStatus.equals(OrderStatusConstant.UN_PAID)) {
// 恢复库存
timePeriodService.restoreStockAndBookedSlots(TimePeriodStockRestoreReqDTO.builder()
.userId(userId)
.courtIndex(courtIndex)
.timePeriodId(timePeriodId)
.partitionId(partitionId)
.build());
}
return true;
}
return false;
}
@Override
public String name() {
return ChainConstant.CANAL_CHAIN_NAME;
}
@Override
public int order() {
return 0;
}
}
MQ 消费者调用责任链
使用非常简单,直接调用chainContext.handler(ChainConstant.
CANAL_CHAIN_NAME
, canalBinlogDTO);
即可
package com.vrs.rocketMq.listener;
import com.vrs.annotation.Idempotent;
import com.vrs.chain_of_responsibility.ChainContext;
import com.vrs.constant.ChainConstant;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.CanalBinlogDTO;
import com.vrs.enums.IdempotentSceneEnum;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @Author dam
* @create 2024/9/20 21:30
*/
@Slf4j(topic = RocketMqConstant.CANAL_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.CANAL_TOPIC,
consumerGroup = RocketMqConstant.CANAL_CONSUMER_GROUP,
messageModel = MessageModel.CLUSTERING
)
@RequiredArgsConstructor
public class CanalBinlogCommonListener implements RocketMQListener<CanalBinlogDTO> {
private final ChainContext chainContext;
/**
* 消费消息的方法
* 方法报错就会拒收消息
*
* @param CanalBinlogDTO 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数
*/
@Idempotent(
uniqueKeyPrefix = "canal_binlog_common:",
key = "#canalBinlogDTO.getId()+''",
scene = IdempotentSceneEnum.MQ,
keyTimeout = 3600L
)
@SneakyThrows
@Override
public void onMessage(CanalBinlogDTO canalBinlogDTO) {
if (canalBinlogDTO.getOld() == null) {
// --if-- 如果不是修改数据,快速退出,因为我们现在的业务逻辑都是识别出数据修改才有下面的操作
return;
}
// 调用责任链来消费 canal 消息
chainContext.handler(ChainConstant.CANAL_CHAIN_NAME, canalBinlogDTO);
}
}
原文地址:https://blog.csdn.net/laodanqiu/article/details/145308548
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!