自学内容网 自学内容网

支持异步线程自动传递上下文(例如当前请求)的工具类(支持自定义上下文传递逻辑,支持拦截所有异步操作)

支持异步线程自动传递上下文(例如当前请求)的工具类(支持自定义上下文传递逻辑,支持拦截所有异步操作)

当我们使用异步线程去执行一些耗时操作的时候,这些异步操作中可能需要获取当前请求等上下文信息
若未做传递逻辑默认是获取不到的,因此写了一个自动传递的工具和切面,可使用工具手动调用时会自动拷贝上下文信息,加载切面后会拦截所有异步操作自动拷贝上下文信息。
同时,上下文信息的加载拷贝移除逻辑也可实现接口自定义,自行扩展。

和阿里巴巴TransmittableThreadLocal(TTL)类似,多支持了可以自定义上下文传递逻辑,你可以认为是阿里TTL手写版本知乎介绍阿里TTL原理
在这里插入图片描述

使用示范

异步执行lambda表达式中的代码,代码中获取当前请求的URL并打印
若未使用该工具,是无法获取到当前请求的。

ContextSupportedAsyncUtil.execute(()->{System.out.println("我在异步线程中获取到的当前请求URL是"+((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest().getRequestURI());});

ContextSupportedAsyncUtil .java


import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
 * @author humorchen
 * date: 2024/7/30
 * description: 支持上下文自动传递的异步工具
 * 默认支持自动传递 HttpServletRequest 到异步线程中
 * 其他上下文可自行创建类实现 AsyncContextInjector 接口,并调用ContextSupportedAsyncUtil.registerContextInjector 将其注册上去。
 * 可参考 SpringWebRequestContextInjector。class
 **/
@Slf4j
public class ContextSupportedAsyncUtil {
    private static final int CORE_SIZE = 8;
    private static final int MAX_SIZE = 32;
    private static final int QUEUE_SIZE = 1024;
    private static final int KEEP_ALIVE = 5;
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.MINUTES;
    private static final ArrayBlockingQueue<Runnable> QUEUE = new ArrayBlockingQueue<>(QUEUE_SIZE);
    private static final AtomicInteger THREAD_NUM = new AtomicInteger(0);
    private static final RejectedExecutionHandler REJECT_POLICY = new ThreadPoolExecutor.CallerRunsPolicy();
    private static final List<Class<? extends AsyncContextInjector>> ASYNC_CONTEXT_INJECTOR_CLS_LIST = new ArrayList<>();
    private static ContextSupportedThreadPoolExecutor EXECUTOR = new ContextSupportedThreadPoolExecutor(CORE_SIZE, MAX_SIZE, KEEP_ALIVE, KEEP_ALIVE_UNIT, QUEUE, (r) -> new Thread(r, "AsyncUtil-thread-" + THREAD_NUM.incrementAndGet()), REJECT_POLICY);

    static {
        // 默认支持spring mvc 的RequestHolder自动传递到异步线程中
        ASYNC_CONTEXT_INJECTOR_CLS_LIST.add(SpringWebRequestContextInjector.class);
    }

    /**
     * 支持自定义上下文传递的executor
     */
    public static class ContextSupportedThreadPoolExecutor extends ThreadPoolExecutor {

        public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }

        public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }

        public ContextSupportedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }

        /**
         * @param command the task to execute
         */
        @Override
        public void execute(@NonNull Runnable command) {
            super.execute(command instanceof AsyncRunnable ? command : getAsyncRunnable(command));
        }
    }

    /**
     * 异步上下文注入器接口
     * 将A线程的上下文注入到执行Runnable的B线程,并在执行后清除
     */
    public interface AsyncContextInjector {
        /**
         * 初始化
         * 读取A线程的上下文并保存到当前对象
         */
        void init();

        /**
         * 注入上下文信息
         * 注入init阶段存储的上下文到B线程的上下文中
         */
        void inject();

        /**
         * 移除上下文信息
         * 清理B线程刚注入的上下文
         */
        void remove();
    }

    /**
     * 支持传递ServletRequestAttributes对象用于获取当前请求HttpServletRequest
     */
    public static class SpringWebRequestContextInjector implements AsyncContextInjector {
        private ServletRequestAttributes requestAttributes;

        /**
         * 初始化
         * 读取A线程的上下文并保存到当前对象
         */
        @Override
        public void init() {
            requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        }

        /**
         * 注入上下文信息
         * 注入init阶段存储的上下文到B线程的上下文中
         */
        @Override
        public void inject() {
            RequestContextHolder.setRequestAttributes(requestAttributes);
        }

        /**
         * 移除上下文信息
         * 清理B线程刚注入的上下文
         */
        @Override
        public void remove() {
            RequestContextHolder.resetRequestAttributes();
        }

    }

    /**
     * 抽象的注入器,将自定义的每个注入器执行
     */
    public static class AbstractAsyncContextInjector {
        private final List<AsyncContextInjector> list = new ArrayList<>(ASYNC_CONTEXT_INJECTOR_CLS_LIST.size());

        public void init() {
            // 初始化每个异步上下文注入器
            for (Class<? extends AsyncContextInjector> aClass : ASYNC_CONTEXT_INJECTOR_CLS_LIST) {
                try {
                    AsyncContextInjector asyncContextInjector = aClass.newInstance();
                    asyncContextInjector.init();
                    list.add(asyncContextInjector);
                } catch (Exception e) {
                    log.error("AbstractAsyncContextInjector init error", e);
                }
            }
        }

        public void inject() {
            for (AsyncContextInjector asyncContextInjector : list) {
                try {
                    asyncContextInjector.inject();
                } catch (Exception e) {
                    log.error("AbstractAsyncContextInjector inject error", e);
                }
            }
        }

        public void remove() {
            for (AsyncContextInjector asyncContextInjector : list) {
                try {
                    asyncContextInjector.remove();
                } catch (Exception e) {
                    log.error("AbstractAsyncContextInjector remove error", e);
                }
            }
        }
    }

    /**
     * 支持异步线程传递自定义上下文时使用的Runnable包装类
     */
    public static class AsyncRunnable extends AbstractAsyncContextInjector implements Runnable {
        private final Runnable runnable;


        public AsyncRunnable(Runnable runnable) {
            // 初始化保存A线程上下文信息
            init();
            this.runnable = runnable;
        }


        @Override
        public void run() {
            if (runnable == null) {
                return;
            }
            try {
                // 将保存的A线程的上下文信息恢复到B线程
                inject();
                runnable.run();
            } catch (Exception e) {
                log.error("【ContextSupportedAsyncUtil】 run error {}", e.getMessage());
                throw e;
            } finally {
                // 清理B线程的上下文
                remove();
            }
        }

    }

    /**
     * 异步执行任务
     *
     * @param runnable
     */
    public static void execute(Runnable runnable) {
        EXECUTOR.execute(runnable);
    }

    /**
     * 异步执行任务
     *
     * @param runnable
     */
    public static void submit(Runnable runnable) {
        execute(runnable);
    }

    /**
     * 执行异步任务并获取返回值
     *
     * @param supplier
     * @param <T>
     * @return
     */
    public static <T> CompletableFuture<T> execute(Supplier<T> supplier) {
        return CompletableFuture.supplyAsync(supplier, EXECUTOR);
    }

    /**
     * 获取线程池
     *
     * @return
     */
    public static ThreadPoolExecutor getExecutor() {
        return EXECUTOR;
    }

    /**
     * 设置本工具使用的线程池
     *
     * @param executor
     */
    public static void setExecutor(ContextSupportedThreadPoolExecutor executor) {
        if (executor == null) {
            throw new NullPointerException("executor 不得为空");
        }
        if (executor.isShutdown() || executor.isTerminated() || executor.isTerminating()) {
            throw new IllegalStateException("executor 状态不得为shutdown");
        }
        ContextSupportedThreadPoolExecutor old = EXECUTOR;
        EXECUTOR = executor;
        if (old != null) {
            old.shutdown();
        }
    }

    /**
     * 获取支持上下文注入的Runnable
     *
     * @param runnable
     * @return 包装后的runnable
     */
    public static AsyncRunnable getAsyncRunnable(Runnable runnable) {
        return new AsyncRunnable(runnable);
    }

    /**
     * 注册注入器
     *
     * @param cls
     */
    public static void registerContextInjector(Class<? extends AsyncContextInjector> cls) {
        if (cls != null && !ASYNC_CONTEXT_INJECTOR_CLS_LIST.contains(cls)) {
            ASYNC_CONTEXT_INJECTOR_CLS_LIST.add(cls);
        }
    }
}

自动拦截所有异步线程池操作

ContextSupportedExecutorAspect.java



import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;

/**
 * @author humorchen
 * date: 2024/8/7
 * description: 支持上下文的executor
 **/
@Component
@Aspect
@Slf4j
public class ContextSupportedExecutorAspect {

    
    @Around("execution(* java.util.concurrent.Executor.execute(java.lang.Runnable))")
    public Object contextSupportedExecutor(ProceedingJoinPoint joinPoint) throws Throwable {
        Object[] args = joinPoint.getArgs();
        if (args != null && args.length > 0) {
            Object arg = args[0];
            if (arg instanceof Runnable && !(arg instanceof ContextSupportedAsyncUtil.AsyncRunnable)) {
                args[0] = ContextSupportedAsyncUtil.getAsyncRunnable((Runnable) arg);
            }
        }
        return joinPoint.proceed(args);
    }


}

自定义上下文注入逻辑示范

SpringWebRequestContextInjector.java
异步线程上下文自动注入当前请求(默认提供)

/**
     * 支持传递ServletRequestAttributes对象用于获取当前请求HttpServletRequest
     */
    public static class SpringWebRequestContextInjector implements AsyncContextInjector {
        private ServletRequestAttributes requestAttributes;

        /**
         * 初始化
         * 读取A线程的上下文并保存到当前对象
         */
        @Override
        public void init() {
            requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        }

        /**
         * 注入上下文信息
         * 注入init阶段存储的上下文到B线程的上下文中
         */
        @Override
        public void inject() {
            RequestContextHolder.setRequestAttributes(requestAttributes);
        }

        /**
         * 移除上下文信息
         * 清理B线程刚注入的上下文
         */
        @Override
        public void remove() {
            RequestContextHolder.resetRequestAttributes();
        }

    }

在这里插入图片描述


原文地址:https://blog.csdn.net/HumorChen99/article/details/142364498

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!