java 线程池工具类
ThreadPoolUtils 线程池工具类,
package com.zzc.common.utils;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ThreadPoolUtils {
private static final Map<String, ThreadPoolExecutor> THREAD_POOL_EXECUTOR;
private static final String COMMON_THREAD_POOL_KEY;
private static final String COMMON_SCHEDULE_EXECUTOR_POOL_KEY;
private static final int DEFAULT_CORE_POOL_SIZE = 1;
private static final int DEFAULT_KEEP_ALIVE_TIME = 30;
private static final int DEFAULT_PROCESSORS = 2;
private static int ALB_PROCESSORS = DEFAULT_PROCESSORS;
static {
THREAD_POOL_EXECUTOR = new ConcurrentHashMap<>();
COMMON_THREAD_POOL_KEY = "COMMON";
COMMON_SCHEDULE_EXECUTOR_POOL_KEY = "SCHEDULE-COMMON";
int processors = Runtime.getRuntime().availableProcessors();
ALB_PROCESSORS = processors;
log.info("availableProcessors:{}, DEFAULT_PROCESS:{}", ALB_PROCESSORS, DEFAULT_PROCESSORS);
ALB_PROCESSORS = Math.max(ALB_PROCESSORS, DEFAULT_PROCESSORS);
}
/**
*
* @param threadPoolKey
* @param corePoolSize
* @param maxPoolSize
* @param keepAliveTime
* @param timeUnit
* @param discardContinueWait 如果被拒绝,则等待时间,单位ms
* @return
*/
public static ThreadPoolExecutor newThreadPoolExecutorDirectAndAsy(String threadPoolKey, int corePoolSize,
int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
int discardContinueWait) {
return newThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, (BlockingDeque<Runnable>) new SynchronousQueue(true), new DiscardSynchronousQueueWaitPolicy(discardContinueWait));
}
public static ThreadPoolExecutor newThreadPoolExecutor(String threadPoolKey, int corePoolSize,
int maxPoolSize, int keepAliveTime, TimeUnit timeUnit,
BlockingDeque<Runnable> workQueue, RejectedExecutionHandler handler) {
if (StrUtils.isBlank(threadPoolKey)) {
throw new RuntimeException("threadPoolKey is null");
}
if (THREAD_POOL_EXECUTOR.containsKey(threadPoolKey)) {
return THREAD_POOL_EXECUTOR.get(threadPoolKey);
}
log.info("before new threadPool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
corePoolSize = corePoolSize <= 0 ? DEFAULT_CORE_POOL_SIZE : corePoolSize;
maxPoolSize = maxPoolSize <= 0 ? corePoolSize : maxPoolSize;
keepAliveTime = keepAliveTime <= 0 ? DEFAULT_KEEP_ALIVE_TIME : keepAliveTime;
timeUnit = timeUnit == null ? TimeUnit.SECONDS : timeUnit;
ThreadPoolExecutor executor = new PThreadPoolExecutor(threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue, newThreadFactory(threadPoolKey), handler);
log.info("after new thread pool, threadPoolKey:{}, corePoolSize:{}, maxPoolSize:{}, keepAliveTime:{}, timeUnit:{}", threadPoolKey, corePoolSize, maxPoolSize, keepAliveTime, timeUnit);
THREAD_POOL_EXECUTOR.put(threadPoolKey, executor);
return executor;
}
public static ScheduledExecutorService getCommonScheduleExecutorsPool() {
ScheduledThreadPoolExecutor executorService = (ScheduledThreadPoolExecutor) THREAD_POOL_EXECUTOR.get(COMMON_SCHEDULE_EXECUTOR_POOL_KEY);
if (executorService == null) {
executorService = new ScheduledThreadPoolExecutor(2, newThreadFactory(COMMON_SCHEDULE_EXECUTOR_POOL_KEY));
THREAD_POOL_EXECUTOR.put(COMMON_SCHEDULE_EXECUTOR_POOL_KEY, executorService);
}
return executorService;
}
public static ThreadFactory newThreadFactory(String threadPrefix) {
ThreadFactory threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName(threadPrefix);
thread.setDaemon(true);
log.info("new thread:{}", threadPrefix);
return thread;
}
};
return threadFactory;
}
static class CallerRunsPolicy extends ThreadPoolExecutor.CallerRunsPolicy {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
String threadKey = "";
if (e instanceof PThreadPoolExecutor) {
threadKey = ((PThreadPoolExecutor) e).getThreadPoolKey();
}
if (r instanceof Thread) {
log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
} else {
log.warn("CallRuns theadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
}
super.rejectedExecution(r, e);
}
}
/**
* 影响线程执行效率
*/
static class DiscardSynchronousQueueWaitPolicy implements RejectedExecutionHandler {
private long discardContinueWait = 1;
public DiscardSynchronousQueueWaitPolicy(long discardContinueWait) {
if (discardContinueWait <= 0) {
discardContinueWait = 1;
}
this.discardContinueWait = discardContinueWait;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
String threadKey = "";
if (r instanceof Thread) {
log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), ((Thread) r).getClass().getSimpleName());
} else {
log.warn("Discard threadKey:{}, reject hashCode:{}, thread:{}", threadKey, r.hashCode(), r.getClass().getSimpleName());
}
if (!executor.isShutdown()) {
try {
executor.getQueue().poll(discardContinueWait, TimeUnit.MICROSECONDS);
} catch (InterruptedException e) {
log.error("rejectedExecution", e);
}
executor.execute(r);
}
}
}
static class PThreadPoolExecutor extends ThreadPoolExecutor {
private String threadPoolKey;
public PThreadPoolExecutor(String threadPoolKey, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.threadPoolKey = threadPoolKey;
}
@Override
public void execute(Runnable command) {
try {
super.execute(command);
} catch (Exception e) {
log.error("execute error.", e);
}
log.debug("execute runnable, hashCode:{}, threadPoolKey:{}, poolSize:{}, largestPoolSize:{}, activeCount:{}, taskCount:{}, completedTaskCount:{}, queueSize:{}",
command.hashCode(), threadPoolKey, this.getPoolSize(), this.getLargestPoolSize(), this.getActiveCount(), this.getTaskCount(), this.getCompletedTaskCount(), this.getQueue().size());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null) {
log.error("execute Runnable error, hashCode:{}", r.hashCode(), t);
}
super.afterExecute(r, t);
}
public String getThreadPoolKey() {
return threadPoolKey;
}
}
}
原文地址:https://blog.csdn.net/dashalen/article/details/140662872
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!