自学内容网 自学内容网

利用redis实现分布式定时任务

如果是微服务,两个服务都有定时,那么就出问题了,但是上分布式定时任务框架太麻烦怎么办,那么就用redis加个锁,谁先抢到锁谁执行

整个工具类



import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * 实现分布式redis锁.
 *
 * @author linzp
 * @version 1.0.0
 * CreateDate 2021/1/14 13:53
 */
@Component
public class RedisLockUtils {

/**
 * 锁名称前缀.
 */
public static final String TASK_LOCK_PREFIX = "TASK_LOCK_";

/**
 * redis操作.
 */
@Autowired
private StringRedisTemplate redisTemplate;

/**
 * 获取分布式redis锁.
 * 逻辑:
 * 1、使用setNX原子操作设置锁(返回 true-代表加锁成功,false-代表加锁失败)
 * 2、加锁成功直接返回
 * 3、加锁失败,进行监测,是否存在死锁的情况,检查上一个锁是否已经过期
 * 4、如果过期,重新让当前线程获取新的锁。
 * 5、这里可能会出现多个获取锁失败的线程执行到这一步,所以判断是否是加锁成功,如果没有,则返回失败
 *
 * @param taskName       任务名称
 * @param lockExpireTime 锁的过期时间
 * @return true-获取锁成功 false-获取锁失败
 */
public Boolean getLock(String taskName, long lockExpireTime) {
//锁的名称:前缀 + 任务名称
String lockName = TASK_LOCK_PREFIX + taskName;

return (Boolean) redisTemplate.execute((RedisCallback<?>) connection -> {
// 计算此次过期时间:当前时间往后延申一个expireTIme
long expireAt = System.currentTimeMillis() + lockExpireTime + 1;
// 获取锁(setNX 原子操作)
Boolean acquire = connection.setNX(lockName.getBytes(), String.valueOf(expireAt).getBytes());
// 如果设置成功
if (Objects.nonNull(acquire) && acquire) {
return true;
} else {
//防止死锁,获取旧的过期时间,与当前系统时间比是否过期,如果过期则允许其他的线程再次获取。
byte[] bytes = connection.get(lockName.getBytes());
if (Objects.nonNull(bytes) && bytes.length > 0) {
long expireTime = Long.parseLong(new String(bytes));
// 如果旧的锁已经过期
if (expireTime < System.currentTimeMillis()) {
// 重新让当前线程加锁
byte[] oldLockExpireTime = connection.getSet(lockName.getBytes(),
String.valueOf(System.currentTimeMillis() + lockExpireTime + 1).getBytes());
//这里为null证明这个新锁加锁成功,上一个旧锁已被释放
if (Objects.isNull(oldLockExpireTime)) {
return true;
}
// 防止在并发场景下,被其他线程抢先加锁成功的问题
return Long.parseLong(new String(oldLockExpireTime)) < System.currentTimeMillis();
}
}
}
return false;
});
}

/**
 * 删除锁.
 * 这里可能会存在一种异常情况,即如果线程A加锁成功
 * 但是由于io或GC等原因在有效期内没有执行完逻辑,这时线程B也可拿到锁去执行。
 * (方案,可以加锁的时候新增当前线程的id作为标识,释放锁时,判断一下,只能释放自己加的锁)
 *
 * @param lockName 锁名称
 */
public void delLock(String lockName) {
// 直接删除key释放redis锁
redisTemplate.delete(lockName);
}
}

然后弄个定时

/**
 * 测试方法,用于分布式定时任务的加锁
 */
//@Scheduled(cron = "0/5 * * * * *")
public void start(){
try {
Boolean lock = redisLockUtils.getLock("test", 3000);
//获取锁
if (lock) {
log.info("机器【1】上的 demo 定时任务启动了!>> 当前时间 [{}]", LocalDateTime.now());
//延迟一秒
Thread.sleep(1000);
}else {
log.error("获取锁失败了,【其他微服务在执行此定时】此次不执行!");
}
}catch (Exception e){
log.info("获取锁异常了!");
}finally {
//释放redis锁
redisLockUtils.delLock("test");
}
}

但是写起来还要改定时任务代码,贼麻烦,直接用切面 环绕的那种,然后抢到锁执行就行.



import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * 自定义redis锁注解.
 * 目的:把加锁解锁逻辑与业务代码解耦.
 *
 * CreateDate 2021/1/14 16:50
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TaskRedisLock {

/**
 * 定时任务名称.
 */
String taskName();

/**
 * 定时任务锁过期时间.
 */
long expireTime();
}

切面



import com.xhsoft.common.constant.TaskRedisLock;
import com.xhsoft.util.RedisLockUtils;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**
 * 定时任务锁切面,对加了自定义redis锁注解的任务进行拦截.
 *
 * @author linzp
 * @version 1.0.0
 * CreateDate 2021/1/14 16:59
 */
@Component
@Aspect
@Slf4j
public class RedisLockAspect {

//加锁工具类
@Autowired
private RedisLockUtils redisLockUtils;

/**
 * 拦截自定义的redis锁注解.
 */
@Pointcut("@annotation(com.xhsoft.common.constant.TaskRedisLock)")
public void pointCut(){}

/**
 * 环绕通知.
 */
@Around("pointCut()")
public Object Around(ProceedingJoinPoint pjp) throws Throwable {
//获取方法
MethodSignature methodSignature = (MethodSignature) pjp.getSignature();
Method method = methodSignature.getMethod();
//获取方法上的注解
TaskRedisLock annotation = method.getAnnotation(TaskRedisLock.class);
//获取任务名称
String taskName = annotation.taskName();
//获取失效时间
long expireTime = annotation.expireTime();
try {
//获取锁
Boolean lock = redisLockUtils.getLock(taskName, expireTime);
if (lock) {
return pjp.proceed();
}else {
log.error("[{} 任务] 获取redis锁失败了,此次不执行...", taskName);
}
}catch (Exception e){
log.error("[{} 任务]获取锁异常了!", taskName, e);
}finally {
//释放redis锁
redisLockUtils.delLock(taskName);
}
return null;
}
}

最后试试

/**
 * 使用自定义TaskRedisLock注解,通过aop来加锁.
 */
@TaskRedisLock(taskName = "task_1", expireTime = 4000)
@Scheduled(cron = "0/5 * * * * *")
public void run(){
log.info("task_1 定时任务启动了!>> 当前时间 [{}]", LocalDateTime.now());
try {
//延迟一秒
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}


原文地址:https://blog.csdn.net/weixin_42759398/article/details/142637779

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!