@RabbitListener处理重试机制完成后的异常捕获
- application.properties中配置开启手动签收
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 定义一个重试器
@Slf4j
@Configuration
public class RabbitMQRetryConfing {
@Bean("customRetry")
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3); // 设置重试次数
retryTemplate.setRetryPolicy(retryPolicy);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(2000); // 重试间隔,单位:毫秒
retryTemplate.setBackOffPolicy(backOffPolicy);
// 添加 RetryListener 以观察重试过程
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
//春姐,这里可以灵活配置重试失败后的回调,例如发送告警、更新数据库状态等
if (context.getRetryCount() >= retryPolicy.getMaxAttempts()) {
int retryCount = context.getRetryCount();
// 当重试次数耗尽时进行处理
log.info("已达到最大重试次数{}次,丢弃本次任务",retryCount);
}
}
});
return retryTemplate;
}
}
这个重试器可以配置最大重试次数、重试之间间隔次数等策略配置,调用重试器的execute方法可以进行队列消费,如果在执行一次任务期间发生了异常,则会根据配置的重试次数以及间隔时间自动触发下一次重试,每一次重试都是在同一个线程中执行完成的,并且RetryTemplate会为每一次重试失败进行回调,提供了诸如 onOpen、onClose、onError等回调时机。
- RabbitMQ监听器回调方法
@Autowired
@Qualifier("customRetry")
private RetryTemplate retryTemplate;
//queues消费的队列 ackMode确认模式 MANUAL 手动确认
@RabbitListener(queues = "q",ackMode = "MANUAL")
@Override
public void onMessage(Message message, Channel channel) {
try {
retryTemplate.execute(new RetryCallback<Object, Throwable>() {
@Override
public Object doWithRetry(RetryContext retryContext) throws Throwable {
// 消息的唯一标识id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("接收到mq信息" + new String(message.getBody()));
try{
//todo 开始业务处理
String msg = new String(message.getBody());
Integer articleId = Integer.parseInt(msg);
newsService.pullNews(articleId);
// 手动签收的第一个参数为消息的唯一标识id、第二个参数表示是否批量签收
channel.basicAck(deliveryTag,false);
log.info("消费mq消息成功,articleId为:{}",articleId);
return null;
}catch (Throwable e){
log.error(String.format("失败,异常信息为:%s",new String(message.getBody()),e.getMessage()));
//重新抛出异常 触发重试机制
throw e;
}
finally {
//重试次数达到限制
log.error(String.format("失败",new String(message.getBody())));
//不重新入队,发送到死信队列
// 手动拒绝签收的第一个参数为消息id、
// 第二个参数表示是否批量签收
// 第三个参数消息是否重回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
});
} catch (Throwable throwable) {
log.info("执行回调重试上下文出现异常");
}
}
这样,当耗尽完重试次数之后就会被回调到onError方法中执行自定义的异常逻辑处理。
原文地址:https://blog.csdn.net/qq_43750656/article/details/145282302
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!