自学内容网 自学内容网

java 线程池工具类

ThreadPoolUtils 线程池工具类,

package com.zzc.common.utils;

import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ThreadPoolUtils {

    private static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR;

    private static final String COMMON_THREAD_POOL_KEY;

    private static final String COMMON_SCHEDULE_EXECUTOR_POOL_KEY;

    private static final int DEFAULT_CORE_POOL_SIZE = 1;

    private static final int DEFAULT_KEEP_ALIVE_TIME = 30;

    private static final int DEFAULT_PROCESSORS = 2;

    private static int ALB_PROCESSORS = DEFAULT_PROCESSORS;

    static {
        THREAD_POOL_EXECUTOR = new ConcurrentHashMap<>();
        COMMON_THREAD_POOL_KEY = "COMMON";
        COMMON_SCHEDULE_EXECUTOR_POOL_KEY = "SCHEDULE-COMMON";
        int processors = Runtime.getRuntime().availableProcessors();
        ALB_PROCESSORS = processors;
        log.info("availableProcessors:{}, DEFAULT_PROCESS:{}", ALB_PROCESSORS, DEFAULT_PROCESSORS);
        ALB_PROCESSORS = Math.max(ALB_PROCESSORS, DEFAULT_PROCESSORS);
    }

    /**
     *
     * @param threadPoolKey
     * @param corePoolSize
     * @param maxPoolSize
     * @param keepAliveTime
     * @param timeUnit
     * @param discardContinueWait 如果被拒绝,则等待时间,单位ms
     * @return
     */
    public static ThreadPoolExecutor newThreadPoolExecutorDirectAndAsy(String threadPoolKey, int corePoolSize,
                                                                       int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
                                                                       int discardContinueWait) {
        return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, (BlockingDeque<Runnable>) new SynchronousQueue(true), new DiscardSynchronousQueueWaitPolicy(discardContinueWait));
    }

    public static ThreadPoolExecutor newThreadPoolExecutor(String threadPoolKey, int corePoolSize,
                                                           int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
                                                           BlockingDeque<Runnable> workQueue, RejectedExecutionHandler handler) {
        if (StrUtils.isBlank(threadPoolKey)) {
            throw new RuntimeException("threadPoolKey is null");
        }
        if (THREAD_POOL_EXECUTOR.containsKey(threadPoolKey)) {
            return THREAD_POOL_EXECUTOR.get(threadPoolKey);
        }
        log.info("before new threadPool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
        corePoolSize = corePoolSize <= 0 ? DEFAULT_CORE_POOL_SIZE : corePoolSize;
        maxPoolSize = maxPoolSize <= 0 ? corePoolSize : maxPoolSize;
        keepAliveTime = keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime;
        timeUnit = timeUnit == null ? TimeUnit.SECONDS : timeUnit;
        ThreadPoolExecutor executor = new PThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, newThreadFactory(threadPoolKey), handler);
        log.info("after new thread pool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);

        THREAD_POOL_EXECUTOR.put(threadPoolKey, executor);
        return executor;
    }

    public static ScheduledExecutorService getCommonScheduleExecutorsPool() {
        ScheduledThreadPoolExecutor executorService = (ScheduledThreadPoolExecutor) THREAD_POOL_EXECUTOR.get(COMMON_SCHEDULE_EXECUTOR_POOL_KEY);
        if (executorService == null) {
            executorService = new ScheduledThreadPoolExecutor(2, newThreadFactory(COMMON_SCHEDULE_EXECUTOR_POOL_KEY));
            THREAD_POOL_EXECUTOR.put(COMMON_SCHEDULE_EXECUTOR_POOL_KEY, executorService);
        }
        return executorService;
    }

    public static ThreadFactory newThreadFactory(String threadPrefix) {
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName(threadPrefix);
                thread.setDaemon(true);
                log.info("new thread:{}", threadPrefix);
                return thread;
            }
        };
        return threadFactory;
    }

    static class CallerRunsPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            String threadKey = "";
            if (e instanceof PThreadPoolExecutor) {
                threadKey = ((PThreadPoolExecutor) e).getThreadPoolKey();
            }
            if (r instanceof Thread) {
                log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
            } else {
                log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
            }
            super.rejectedExecution(r, e);
        }
    }

    /**
     * 影响线程执行效率
     */
    static class DiscardSynchronousQueueWaitPolicy implements RejectedExecutionHandler {

        private long discardContinueWait = 1;

        public DiscardSynchronousQueueWaitPolicy(long discardContinueWait) {
            if (discardContinueWait <= 0) {
                discardContinueWait = 1;
            }
            this.discardContinueWait = discardContinueWait;
        }

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            String threadKey = "";
            if (r instanceof Thread) {
                log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
            } else {
                log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
            }

            if (!executor.isShutdown()) {
                try {
                    executor.getQueue().poll(discardContinueWait, TimeUnit.MICROSECONDS);
                } catch (InterruptedException e) {
                    log.error("rejectedExecution", e);
                }
                executor.execute(r);
            }
        }
    }

    static class PThreadPoolExecutor extends ThreadPoolExecutor {

        private String threadPoolKey;

        public PThreadPoolExecutor(String threadPoolKey, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
            this.threadPoolKey = threadPoolKey;
        }

        @Override
        public void execute(Runnable command) {
            try {
                super.execute(command);
            } catch (Exception e) {
                log.error("execute error.", e);
            }
            log.debug("execute runnable, hashCode:{}, threadPoolKey:{}, poolSize:{}, largestPoolSize:{}, activeCount:{}, taskCount:{}, completedTaskCount:{}, queueSize:{}",
                    command.hashCode(), threadPoolKey, this.getPoolSize(), this.getLargestPoolSize(), this.getActiveCount(), this.getTaskCount(), this.getCompletedTaskCount(), this.getQueue().size());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            if (t != null) {
                log.error("execute Runnable error, hashCode:{}", r.hashCode(), t);
            }
            super.afterExecute(r, t);
        }

        public String getThreadPoolKey() {
            return threadPoolKey;
        }
    }

}


原文地址:https://blog.csdn.net/dashalen/article/details/140662872

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!