基于注解实现去重表消息防止重复消费
基于注解实现去重表消息防止重复消费
1. 背景/问题
在分布式系统中,消息队列(如RocketMQ、Kafka)的 消息重复消费 是常见问题,主要原因包括:
- 网络抖动:生产者或消费者因网络不稳定触发消息重发。
- 消费者超时:消费者处理时间过长,消息队列误判为失败并重新投递。
- 集群故障转移:消费者宕机后,未完成的消息会被其他节点重新拉取。
重复消费带来的问题:
- 业务逻辑多次执行(如重复扣款、重复生成订单)。
- 数据一致性被破坏(如库存超卖、积分累加错误)。
- 系统资源浪费,影响性能和稳定性。
为了避免这种情况发生,需要在客户端实现一些机制来确保消息不会被重复消费,例如记录消费者已经处理的消息 ID、使用分布式锁来控制消费进程的唯一性等。这些机制能够保证消息被成功处理,同时也能够提高系统的可靠性和稳定性。
2. 什么是幂等性
幂等性 是指对同一操作的多次执行所产生的影响与一次执行的影响相同。
- 消息消费场景:无论消息被消费多少次,最终结果应与消费一次一致。
- 实现目标:通过幂等设计,确保业务逻辑的重复执行不会产生副作用。
3. 幂等设计
核心思路
- 幂等标识:为每条消息生成唯一标识(如业务ID + 消息ID),记录其处理状态。
- 状态管理:通过数据库或Redis维护幂等标识的状态(如“消费中”“已消费”)。
- 过期时间:防止因系统崩溃导致状态长期滞留,需设置合理的超时时间(如10分钟)。
[消费者接收消息]
│
▼
[解析消息,生成唯一幂等标识]
│
▼
[查询幂等标识状态]
│
┌───────┴───────┐
│ 存在且已消费 │ [返回成功,丢弃消息]
└───────┬───────┘
│
┌───────┴───────┐
│ 存在且消费中 │ [延迟消费,等待重试]
└───────┬───────┘
│
┌───────┴───────┐
│ 不存在 │
└───────┬───────┘
│
[设置幂等标识为“消费中”,并设置过期时间]
│
▼
[执行业务逻辑]
│
▼
[业务执行成功?]
│
┌───────┴───────┐
│ 是 │ [更新标识为“已消费”]
│ │ [删除或保留标识]
└───────┬───────┘
│
┌───────┴───────┐
│ 否 │ [删除标识,允许重试]
└───────┬───────┘
│
▼
[流程结束]
4.抽象通用幂等组件
消息防重复消费幂等组件是通用的通常会提取出来也可供其他模块/服务 使用
4.1自定义幂等注解
提供了一种通用的幂等注解,并通过 SpEL 的形式生成去重表全局唯一 Key
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {
/**
* 设置防重令牌 Key 前缀
*/
String keyPrefix() default "";
/**
* 通过 SpEL 表达式生成的唯一 Key
*/
String key();
/**
* 设置防重令牌 Key 过期时间,单位秒,默认 1 小时
*/
long keyTimeout() default 3600L;
}
4.2. 定义幂等枚举
幂等需要设置两个状态,消费中和已消费,创建对应的枚举
@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {
/**
* 消费中
*/
CONSUMING("0"),
/**
* 已消费
*/
CONSUMED("1");
@Getter
private final String code;
/**
* 如果消费状态等于消费中,返回失败
*
* @param consumeStatus 消费状态
* @return 是否消费失败
*/
public static boolean isError(String consumeStatus) {
return Objects.equals(CONSUMING.code, consumeStatus);
}
}
4.3.通过 AOP 的方式进行增强注解
如果说方法上加了注解,会被这段 AOP 代码以环绕增强方式执行
@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {
private final StringRedisTemplate stringRedisTemplate;
private static final String LUA_SCRIPT = """
local key = KEYS[1]
local value = ARGV[1]
local expire_time_ms = ARGV[2]
return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
""";
/**
* 增强方法标记 {@link NoMQDuplicateConsume} 注解逻辑
*/
@Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")
public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {
NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);
String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());
String absentAndGet = stringRedisTemplate.execute(
RedisScript.of(LUA_SCRIPT, String.class),
List.of(uniqueKey),
IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),
String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout()))
);
// 如果不为空证明已经有
if (Objects.nonNull(absentAndGet)) {
boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);
log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");
if (errorFlag) {
throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));
}
return null;
}
Object result;
try {
// 执行标记了消息队列防重复消费注解的方法原逻辑
result = joinPoint.proceed();
// 设置防重令牌 Key 过期时间,单位秒
stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);
} catch (Throwable ex) {
// 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费
stringRedisTemplate.delete(uniqueKey);
throw ex;
}
return result;
}
/**
* @return 返回自定义防重复消费注解
*/
public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
return targetMethod.getAnnotation(NoMQDuplicateConsume.class);
}
lua脚本解释
local key = KEYS[1] # 第一个 Key,即幂等唯一标识 uniqueKey
local value = ARGV[1] # 第一个参数,即初始化幂等消费状态,为消费中
local expire_time_ms = ARGV[2] # 第二个参数,即幂等 Key 过期时间
return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
该脚本的主要作用是:在 Redis 中尝试以 NX
方式设置一个键,即如果键不存在,则设置新值,并返回设置之前的旧值,同时为该键设置过期时间(以毫秒为单位)。
获取到 Redis 里面的 Key 值后,可能会有三个流程执行:
absentAndGet
为空:代表消息是第一次到达,执行完 LUA 脚本后,会在 Redis 设置 Key 的 Value 值为 0,消费中状态。
absentAndGet
为 0:代表已经有相同消息到达并且还没有处理完,会通过抛异常的形式让 RocketMQ 重试。
absentAndGet
为 1:代表已经有相同消息消费完成,返回空表示不执行任何处理。
4.4.注册为 Spring Bean
另外可以看看另一篇基于分布式锁注解防重复提交
https://blog.csdn.net/sjsjsbbsbsn/article/details/145131305?spm=1001.2014.3001.5501
public class IdempotentConfiguration {
/**
* 防止消息队列消费者重复消费消息切面控制器
*/
@Bean
public NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) {
return new NoMQDuplicateConsumeAspect(stringRedisTemplate);
}
}
4.5EL工具类
public class SpELUtil {
/**
* 校验并返回实际使用的 spEL 表达式
*
* @param spEl spEL 表达式
* @return 实际使用的 spEL 表达式
*/
public static Object parseKey(String spEl, Method method, Object[] contextObj) {
List<String> spELFlag = ListUtil.of("#", "T(");
Optional<String> optional = spELFlag.stream().filter(spEl::contains).findFirst();
if (optional.isPresent()) {
return parse(spEl, method, contextObj);
}
return spEl;
}
/**
* 转换参数为字符串
*
* @param spEl spEl 表达式
* @param contextObj 上下文对象
* @return 解析的字符串值
*/
public static Object parse(String spEl, Method method, Object[] contextObj) {
DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer();
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression(spEl);
String[] params = discoverer.getParameterNames(method);
StandardEvaluationContext context = new StandardEvaluationContext();
if (ArrayUtil.isNotEmpty(params)) {
for (int len = 0; len < params.length; len++) {
context.setVariable(params[len], contextObj[len]);
}
}
return exp.getValue(context);
}
}
5.实战使用
使用天机学堂项目来进行实战
5.1写入common模块
5.2使用
直接加上注解就可以
但是实际上这里不存在幂等问题,因为userId和courseId设置了唯一索引,所以这里不存在幂等性,不需要加上幂等注解
原文地址:https://blog.csdn.net/sjsjsbbsbsn/article/details/145278390
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!