Java并发知识(JUC包)表格总结
Java的java.util.concurrent
(JUC)包是Java并发编程的核心包,提供了广泛的并发工具,用于创建线程安全的程序。表格总结的可能没有面面俱到,不过常用的一些基本上会总结到的,在项目实践中可以用,主要是作为复习之用。
目录
1. Executor框架
组件 | 描述 | 示例代码 |
---|---|---|
Executor | 顶层接口,用于将任务提交给线程池执行。 | Executor executor = Executors.newFixedThreadPool(3); |
ExecutorService | 接口,扩展了Executor,提供了管理线程池的方法。 | ExecutorService service = Executors.newFixedThreadPool(3); |
Executors | 工厂类,用于创建预定义配置的线程池。 | Executors.newFixedThreadPool(3); |
ThreadPoolExecutor | 可定制的线程池实现。 | new ThreadPoolExecutor(...); |
ScheduledExecutorService | 用于延迟执行或定期执行任务的线程池接口。 | ScheduledExecutorService service = Executors.newScheduledThreadPool(3); |
Future | 表示异步计算的结果,可用于查询计算是否完成,等待计算结果。 | Future<?> future = executor.submit(()->{/* task */}); |
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务给线程池
for (int i = 0; i < 5; i++) {
int finalI = i;
executor.submit(() -> {
System.out.println("Task " + finalI + " is running.");
});
}
// 关闭线程池,不再接受新任务,已提交的任务将完成后关闭
executor.shutdown();
try {
// 等待线程池关闭,最多等待1分钟
if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
// 如果超时,则强制关闭
executor.shutdownNow();
}
} catch (InterruptedException e) {
// 线程被中断时的处理
executor.shutdownNow();
}
}
}
注意:这时示例代码,在项目实践中不建议直接使用Executors.newFixedThreadPool(3)这样的自动方式创建线程池,因为会有潜在的OOM的风险,而是推荐自定义线程池
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("test-async-%d").build();
ExecutorService executor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024),
namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
2.同步器
同步器是用于控制并发任务之间协调的组件,它们可以用于同步线程的执行,确保线程安全,以及实现复杂的线程间协作。
类型/接口 | 描述 | 示例代码 |
---|---|---|
CountDownLatch | 一个同步辅助类,允许一个或多个线程等待一组操作完成。 | CountDownLatch latch = new CountDownLatch(1); |
CyclicBarrier | 类似于CountDownLatch,但可以重复使用。 | CyclicBarrier barrier = new CyclicBarrier(3); |
Semaphore | 一个计数信号量,用于控制同时访问特定资源的线程数量。 | Semaphore semaphore = new Semaphore(3); |
Exchanger | 一个同步辅助类,用于两个线程之间的数据交换。 | Exchanger<String> exchanger = new Exchanger<>(); |
Phaser | 一个更灵活的同步辅助类,用于线程之间的协调。 | Phaser phaser = new Phaser(); |
示例代码:
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SynchronizerExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建一个CyclicBarrier,用于3个线程之间的同步
CyclicBarrier barrier = new CyclicBarrier(3);
// 提交任务给线程池
for (int i = 0; i < 3; i++) {
final int taskNumber = i;
executor.execute(() -> {
System.out.println("Thread " + taskNumber + " is running.");
try {
// 当一个线程到达屏障点时,等待其他线程
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Thread " + taskNumber + " has passed the barrier.");
});
}
// 关闭线程池
executor.shutdown();
}
}
这个示例展示了如何使用CyclicBarrier
来同步多个线程,确保它们在某个点上都达到了同步状态。每个线程在执行到屏障点时会调用barrier.await()
,这会导致线程阻塞,直到所有线程都到达屏障点,然后它们会同时继续执行
3.锁框架
在多线程环境中,锁是确保线程安全的关键机制。JUC包提供了多种锁的实现,用于控制对共享资源的访问。
锁类型 | 描述 | 示例代码 |
---|---|---|
Lock | 顶层接口,提供了比synchronized更灵活的锁操作。 | Lock lock = new ReentrantLock(); |
ReentrantLock | 可重入锁,实现了Lock接口,提供了公平和非公平锁的选项。 | ReentrantLock lock = new ReentrantLock(true); |
ReadWriteLock | 读写锁,允许多个读操作同时进行,但写操作是排他的。 | ReadWriteLock rwLock = new ReentrantReadWriteLock(); |
StampedLock | 一种新型的读写锁,相比ReadWriteLock,它提供了更多的灵活性。 | StampedLock stampedLock = new StampedLock(); |
示例代码:
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockExample {
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock(); // 获取锁
try {
// 临界区:在这个区域内,只有当前线程可以执行
System.out.println("Critical section: only one thread can execute here.");
} finally {
lock.unlock(); // 释放锁
}
}
}
在这个示例中,ReentrantLock
被用来创建一个锁,以确保临界区内的代码只能由一个线程执行。lock()
方法用于获取锁,而unlock()
方法用于释放锁。注意,示例中使用了try-finally
结构来确保即使在发生异常的情况下,锁也能被正确释放。
4.原子变量
原子变量是JUC包中提供的一种线程安全的变量,它们利用了底层的原子操作来保证在多线程环境下的安全性,而无需使用锁。
类型/接口 | 描述 | 示例代码 |
---|---|---|
AtomicInteger | 一个使用int 值的原子变量。 | AtomicInteger atomicInt = new AtomicInteger(); |
AtomicLong | 一个使用long 值的原子变量。 | AtomicLong atomicLong = new AtomicLong(); |
AtomicReference | 一个使用任意引用类型的原子变量。 | AtomicReference<String> atomicRef = new AtomicReference<>(); |
AtomicBoolean | 一个使用boolean 值的原子变量。 | AtomicBoolean atomicBool = new AtomicBoolean(); |
这些原子变量类提供了多种原子操作,如增加、减少、设置值、比较并设置等。
示例代码:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AtomicExample {
public static void main(String[] args) {
// 创建一个原子整数值
AtomicInteger counter = new AtomicInteger(0);
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交增加操作给线程池
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
for (int j = 0; j < 100; j++) {
counter.incrementAndGet(); // 原子增加操作
}
});
}
// 关闭线程池并等待所有任务完成
executor.shutdown();
while (!executor.isTerminated()) {
// 等待线程池关闭
}
// 输出最终的计数值
System.out.println("Final counter value: " + counter.get());
}
}
在这个示例中,AtomicInteger
被用来在多个线程中安全地累加数值。每个线程提交了一个增加操作给线程池,通过incrementAndGet()
方法原子地增加计数器的值。
5.并发集合
并发集合是Java并发包中提供的线程安全的集合类,它们提供了比同步包装器(如Collections.synchronizedList
)更高效的并发访问。
集合类型 | 描述 | 示例代码 |
---|---|---|
ConcurrentHashMap | 线程安全的HashMap实现。 | ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); |
ConcurrentLinkedQueue | 高效的非阻塞线程安全队列。 | ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); |
ConcurrentSkipListMap | 跳表实现的线程安全有序映射。 | ConcurrentSkipListMap<String, String> skipMap = new ConcurrentSkipListMap<>(); |
CopyOnWriteArrayList | 线程安全的ArrayList,适用于读多写少的场景。 | CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); |
CopyOnWriteArraySet | 基于CopyOnWriteArrayList的线程安全Set实现。 | CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>(); |
示例代码:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentCollectionExample {
public static void main(String[] args) {
// 创建线程安全的ConcurrentHashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务给线程池,更新map中的值
for (int i = 0; i < 3; i++) {
final String key = "Key" + i;
executor.submit(() -> {
// 使用原子方法更新map中的值
map.compute(key, (k, v) -> {
if (v == null) v = 0;
return v + 1; // 原子增加操作
});
});
}
// 关闭线程池并等待所有任务完成
executor.shutdown();
while (!executor.isTerminated()) {
// 等待线程池关闭
}
// 输出map中的值
map.forEach((k, v) -> System.out.println(k + ": " + v));
}
}
在这个示例中,ConcurrentHashMap
被用来在多个线程中安全地更新键值对。通过compute
方法,可以原子地更新map中的值,而不需要额外的同步措施。
6.线程池
线程池是一种执行器(Executor),用于在一个后台线程中执行任务。线程池的主要目的是减少在创建和销毁线程时所产生的性能开销。Java提供了多种线程池的实现,允许开发者根据需要选择合适的线程池。
线程池类型 | 描述 | 示例代码 |
---|---|---|
ThreadPoolExecutor | 提供了大量可配置的选项,是最常用的线程池实现。 | new ThreadPoolExecutor(...); |
ScheduledThreadPoolExecutor | 用于延迟执行或定期执行任务的线程池。 | Executors.newScheduledThreadPool(3); |
FixedThreadPool | 拥有固定数量线程的线程池。 | Executors.newFixedThreadPool(3); |
SingleThreadExecutor | 只有一个线程的线程池,保证所有任务按顺序执行。 | Executors.newSingleThreadExecutor(); |
CachedThreadPool | 根据需要创建新线程的线程池,对于短生命周期的异步任务非常合适。 | Executors.newCachedThreadPool(); |
示例代码:
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
// 提交任务给线程池
for (int i = 0; i < 5; i++) {
int finalI = i;
executor.submit(() -> {
System.out.println("Task " + finalI + " is running on thread " + Thread.currentThread().getName());
});
}
// 关闭线程池,不再接受新任务
executor.shutdown();
try {
// 等待线程池关闭,最多等待1分钟
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// 如果超时,则强制关闭
executor.shutdownNow();
}
} catch (InterruptedException e) {
// 线程被中断时的处理
executor.shutdownNow();
}
}
}
在这个示例中,我们创建了一个固定大小为3的ThreadPoolExecutor
实例,并提交了5个任务。这些任务将由线程池中的线程执行。通过调用shutdown()
方法,线程池将不再接受新任务,并且会等待已提交的任务执行完成。如果等待超过60秒,线程池将被强制关闭。
线程池更详细的内容可进一步查看我的另一篇博客:https://blog.csdn.net/u011174699/article/details/110296957?spm=1001.2014.3001.5502
7.Future和Callable接口
Future
接口和Callable
接口通常与线程池一起使用,用于异步执行任务并获取结果。
接口/类 | 描述 | 示例代码 |
---|---|---|
Callable | 一个接口,允许在其call 方法中抛出异常,并且可以返回结果。 | Callable<String> task = () -> { return "Done"; }; |
Future | 一个接口,代表异步计算的结果。它提供了检查计算是否完成的方法,以及等待计算结果的方法。 | Future<String> future = executor.submit(task); |
FutureTask | 一个类,实现了Future 接口,并且可以作为任务提交给线程池。 | FutureTask<String> futureTask = new FutureTask<>(task); |
Future
的主要方法包括:
boolean cancel(boolean mayInterruptIfRunning)
: 尝试取消执行。boolean isCancelled()
: 是否已经被取消。boolean isDone()
: 任务是否完成。V get()
: 获取任务的结果。V get(long timeout, TimeUnit unit)
: 在指定的时间内获取任务的结果。
示例代码:
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 创建Callable任务
Callable<String> callable = () -> {
// 模拟任务执行时间
Thread.sleep(1000);
return "Callable task is done";
};
// 提交Callable任务到线程池,并返回Future对象
Future<String> future = executorService.submit(callable);
// 在主线程中执行其他任务,同时等待Callable任务完成
System.out.println("Main thread is doing other work");
// 获取Callable任务的结果
String result = future.get();
System.out.println(result);
// 关闭线程池
executorService.shutdown();
}
}
在这个示例中,我们创建了一个Callable
任务,并将其提交给线程池。通过Future
对象,我们能够获取任务的结果。Future.get()
方法会阻塞当前线程,直到任务完成并返回结果。
8.同步队列(BlockingQueue
)
BlockingQueue
是一个线程安全的队列,用于在多个线程之间安全地传输数据。当生产者线程尝试添加一个元素到已满的队列时,它将被阻塞,直到队列中有足够的空间。同样,当消费者线程尝试从一个空队列中取一个元素时,它也将被阻塞,直到队列中有元素可用。
接口/类 | 描述 | 示例代码 |
---|---|---|
BlockingQueue | 顶层接口,定义了阻塞队列的基本操作。 | BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); |
ArrayBlockingQueue | 由数组支持的有界阻塞队列。 | ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); |
LinkedBlockingQueue | 由链表支持的可选容量的阻塞队列。 | LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); |
PriorityBlockingQueue | 无界优先级阻塞队列。 | PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10); |
SynchronousQueue | 不存储元素的阻塞队列,每个插入操作必须等待一个相应的移除操作。 | SynchronousQueue<Integer> queue = new SynchronousQueue<>(); |
BlockingQueue
的主要方法包括:
void put(E e)
: 添加一个元素到队列,如果队列满了,则阻塞。E take()
: 移除并获取队列头部的元素,如果队列为空,则阻塞。E poll(long timeout, TimeUnit unit)
: 如果可能,不阻塞地获取队列头部的元素,如果指定时间内队列为空,则返回null
。E poll()
: 如果可能,不阻塞地获取队列头部的元素,如果队列为空,则返回null
。
示例代码:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个有界阻塞队列
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
// 生产者线程
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
// 将元素添加到队列
queue.put(i);
System.out.println("Produced: " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
// 从队列中取出元素
int number = queue.take();
System.out.println("Consumed: " + number);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 启动生产者和消费者线程
producer.start();
consumer.start();
// 等待生产者和消费者线程结束
producer.join();
consumer.join();
}
}
在这个示例中,我们创建了一个容量为5的LinkedBlockingQueue
,并启动了一个生产者线程和一个消费者线程。生产者线程将数字添加到队列中,而消费者线程从队列中取出数字。由于队列是阻塞的,所以当生产者线程尝试添加元素到已满的队列时,它将被阻塞,直到队列中有空间。同样,消费者线程在尝试从一个空队列中取出元素时也会被阻塞。
9.条件对象(Condition)
在多线程编程中,条件对象允许线程等待一个条件成立,或者等待另一个线程通知。java.util.concurrent.locks
包中的Condition
接口提供了这样的功能,它与Lock
接口一起使用,提供了比Object.wait()
和Object.notify()
更灵活的线程间协调机制。
接口/类 | 描述 | 示例代码 |
---|---|---|
Condition | 顶层接口,提供了线程等待和唤醒机制。 | Condition condition = lock.newCondition(); |
Lock | 顶层接口,提供了比synchronized更灵活的锁操作,并且可以与Condition一起使用。 | Lock lock = new ReentrantLock(); |
ReentrantLock | 实现了Lock接口的锁,可以与Condition一起使用。 | ReentrantLock lock = new ReentrantLock(); |
Condition
的主要方法包括:
void await()
: 导致当前线程等待,直到它被另一个线程唤醒。void awaitUninterruptibly()
: 与await()
类似,但不响应中断。long awaitNanos(long nanosTimeout)
: 在指定的时间内等待条件成立。boolean await(long time, TimeUnit unit)
: 在指定的时间内等待条件成立,并返回条件是否在超时前成立。void signal()
: 唤醒一个等待在该条件上的线程。void signalAll()
: 唤醒所有等待在该条件上的线程。
示例代码:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample {
public static void main(String[] args) {
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
// 线程A
new Thread(() -> {
lock.lock();
try {
// 等待条件成立
condition.await();
System.out.println("Thread A: Condition was signaled.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}).start();
// 线程B
new Thread(() -> {
lock.lock();
try {
// 模拟条件成立前的准备工作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 通知等待的线程条件已经成立
condition.signal();
} finally {
lock.unlock();
}
}).start();
}
}
在这个示例中,线程A等待一个条件成立,而线程B在模拟了一些工作之后,通过调用condition.signal()
来通知线程A条件已经成立。线程A随后被唤醒并继续执行。
10.Fork/Join框架
Fork/Join框架是一种用于并行执行任务的框架,特别适合于可以被分解为小任务的递归算法,如排序、搜索和数学计算。它通过将任务分割成更小的任务并行执行,然后在适当的时候合并结果,从而提高性能。
类型/类 | 描述 | 示例代码 |
---|---|---|
ForkJoinTask | 表示一个可以并行执行的任务。 | ForkJoinTask<Void> task = new RecursiveTask<Void>() { ... }; |
RecursiveTask | 继承自ForkJoinTask的抽象类,用于需要执行递归操作的任务。 | class MyRecursiveTask extends RecursiveTask<Integer> { ... } |
ForkJoinPool | 用于执行ForkJoinTask的线程池。 | ForkJoinPool pool = new ForkJoinPool(); |
ForkJoinPool的主要方法包括:
invoke(task)
: 启动一个任务并等待其完成。submit(task)
: 提交一个任务并返回一个Future对象。execute(task)
: 执行一个任务,但不等待其完成。
示例代码:
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class ForkJoinExample {
static class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 20;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long sum = 0;
if (end - start < THRESHOLD) {
// 如果数组的这部分足够小,直接计算
for (int i = start; i < end; i++) {
sum += array[i];
}
} else {
// 否则,将任务分成两部分
int middle = (start + end) / 2;
SumTask subtask1 = new SumTask(array, start, middle);
SumTask subtask2 = new SumTask(array, middle, end);
subtask1.fork(); // 异步执行第一个子任务
subtask2.fork(); // 异步执行第二个子任务
sum = subtask1.join() + subtask2.join(); // 等待两个子任务完成并合并结果
}
return sum;
}
}
public static void main(String[] args) {
int[] array = { ... }; // 假设这是我们的数组
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
long result = pool.invoke(task); // 启动任务并等待结果
System.out.println("Sum: " + result);
}
}
在这个示例中,我们创建了一个SumTask
类,它继承自RecursiveTask
,用于并行计算数组的和。当任务足够小的时候,我们直接计算它的和;否则,我们将任务分割成两个子任务并行执行它们,然后合并结果。
11.CompletableFuture异步编程
CompletableFuture
是一个在Java 8中引入的类,它提供了一种异步编程的模型,允许开发者以声明式的方式编写异步代码。它封装了一个异步操作,并允许开发者添加回调函数来处理操作的结果。
类型/类 | 描述 | 示例代码 |
---|---|---|
CompletableFuture | 表示一个可能还没有完成的异步操作。 | CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Done"; }); |
CompletionStage | 一个接口,表示一个异步操作的阶段,可以有返回值或没有。 | CompletionStage<String> stage = future.thenApply(s -> s + " Stage"); |
CompletableFuture
的主要方法包括:
supplyAsync(Supplier)
: 提交一个异步的任务。thenApply(Function)
: 当前操作完成后,应用一个函数。thenAccept(Consumer)
: 当前操作完成后,执行一个动作。thenRun(Runnable)
: 当前操作完成后,运行一个动作。thenCombine(CompletionStage, BiFunction)
: 将两个CompletionStage
的结果合并。thenAcceptBoth(CompletionStage, BiConsumer)
: 将当前操作的结果与其他操作的结果结合。allOf(CompletableFuture...)
: 等待多个CompletableFuture
完成。anyOf(CompletableFuture...)
: 等待多个CompletableFuture
中的任何一个完成。
示例代码:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExample {
public static void main(String[] args) {
// 创建一个CompletableFuture任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟长时间运行的任务
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Task is done";
});
// 为CompletableFuture添加回调
future.thenAccept(result -> {
System.out.println(result);
});
// 等待任务完成(阻塞调用)
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
在这个示例中,我们创建了一个异步执行的CompletableFuture
任务,并为其添加了一个回调,当任务完成时,回调会被触发并打印结果。然后,我们使用get()
方法等待任务完成,这个方法是阻塞的。
这个在项目中是很常用的,尤其用于提升接口响应速度特别好用,特别是那种需要多个查询并汇总数据后返回的接口。
12.Phaser多线程协调
Phaser
是一个同步辅助类,用于协调多个线程在某些阶段的执行。它允许线程在不同的“阶段”(phase)中等待,直到到达某个安全点(barrier)才开始执行下一个阶段的操作。
类型/类 | 描述 | 示例代码 |
---|---|---|
Phaser | 用于多线程协调的类,允许线程在不同阶段同步。 | Phaser phaser = new Phaser(); |
Phaseer
的主要方法包括:
int getPhase()
: 返回当前阶段的编号。boolean arrive()
: 当前线程到达,增加到达数。boolean arriveAndAwaitAdvance()
: 当前线程到达并且等待直到进入下一个阶段。boolean awaitAdvance(int phase)
: 等待直到进入指定的阶段。int getRegisteredParties()
: 返回注册的线程数量。
示例代码:
import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) {
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("Phase " + phase + " is completed with " + registeredParties + " registered parties.");
return true; // 继续到下一个阶段
}
};
for (int i = 0; i < 3; i++) {
final int taskNumber = i;
new Thread(() -> {
System.out.println("Task " + taskNumber + " is running.");
phaser.arriveAndAwaitAdvance(); // 到达并等待下一个阶段
System.out.println("Task " + taskNumber + " is resuming after phase " + phaser.getPhase());
}).start();
}
}
}
在这个示例中,我们创建了一个Phaser
实例,并重写了onAdvance
方法来输出每个阶段完成的信息。每个任务线程通过调用arriveAndAwaitAdvance()
来注册自己并等待其他任务到达,直到所有任务都到达后,它们将一起进入下一个阶段。
13.StampedLock读写锁
StampedLock
是Java 8引入的一种新的锁机制,用于提供一种更高效的读写锁。与ReadWriteLock
相比,StampedLock
提供了更细粒度的控制,并且可以避免写饥饿的问题。
类型/类 | 描述 | 示例代码 |
---|---|---|
StampedLock | 一种新的读写锁,提供了乐观读锁和悲观写锁。 | StampedLock stampedLock = new StampedLock(); |
StampededLock
的主要方法包括:
long tryOptimisticRead()
: 尝试乐观读锁,适用于读操作远多于写操作的场景。long readLock()
: 获取读锁。long tryReadLock()
: 尝试获取读锁。void unlock(long stamp)
: 释放锁,stamp是获取锁时返回的标记。long writeLock()
: 获取写锁。long tryWriteLock()
: 尝试获取写锁。
示例代码
import java.util.concurrent.locks.StampedLock;
public class StampedLockExample {
private static final StampedLock stampedLock = new StampedLock();
private static final int[] data = new int[100];
public static void main(String[] args) {
// 启动读操作线程
Thread reader = new Thread(() -> {
long stamp = stampedLock.tryOptimisticRead();
// 执行读操作
int sum = 0;
for (int value : data) {
sum += value;
}
// 验证读操作期间是否发生了写
if (stampedLock.validate(stamp)) {
System.out.println("Sum (optimistic read): " + sum);
} else {
// 如果发生了写,重新获取读锁
stamp = stampedLock.readLock();
try {
int sum2 = 0;
for (int value : data) {
sum2 += value;
}
System.out.println("Sum (re-read): " + sum2);
} finally {
stampedLock.unlock(stamp);
}
}
});
// 启动写操作线程
Thread writer = new Thread(() -> {
long stamp = stampedLock.writeLock();
try {
// 执行写操作
for (int i = 0; i < data.length; i++) {
data[i] = i;
}
System.out.println("Data initialized.");
} finally {
stampedLock.unlock(stamp);
}
});
reader.start();
writer.start();
}
}
在这个示例中,我们创建了一个StampedLock
实例,并使用它来同步对共享数据数组data
的读写操作。读线程尝试使用乐观读锁来计算数组的和,如果在计算过程中发生了写操作,它会重新获取读锁并再次计算。写线程则直接获取写锁来初始化数组。
14.LongAdder和DoubleAdder原子累加器
LongAdder
和DoubleAdder
是Java 8中引入的用于进行原子累加操作的类,它们适用于对long
和double
数值进行频繁的累加操作,尤其是在高并发场景下。与AtomicLong
相比,LongAdder
在大量更新时通常提供更好的性能,因为它使用一种细粒度的算法来减少冲突。
类型/类 | 描述 | 示例代码 |
---|---|---|
LongAdder | 一个用于long 数值的原子累加器。 | LongAdder adder = new LongAdder(); |
DoubleAdder | 一个用于double 数值的原子累加器。 | DoubleAdder adder = new DoubleAdder(); |
LongAdder
和DoubleAdder
的主要方法包括:
void add(long)
: 添加值到累加器。void add(double)
:DoubleAdder
中用于添加值。long sum()
: 返回累加器当前的值。long sumThenReset()
: 返回累加器当前的值并重置累加器为0。
示例代码:
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.LongAdder;
public class AdderExample {
private static final LongAdder adder = new LongAdder();
public static void main(String[] args) {
// 创建多个线程对共享的LongAdder进行累加操作
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
adder.add(ThreadLocalRandom.current().nextLong());
}
}).start();
}
// 等待所有线程完成累加
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 输出累加结果
System.out.println("Sum: " + adder.sum());
}
}
在这个示例中,我们创建了一个LongAdder
实例,并启动了多个线程对它进行累加操作。每个线程生成随机的长整型数字并累加到LongAdder
中。所有线程完成后,我们输出累加的结果
15.阻塞队列
类型/类 | 描述 | 示例代码 |
---|---|---|
ArrayBlockingQueue | 由数组支持的有界阻塞队列。 | ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); |
LinkedBlockingQueue | 由链表支持的可选容量的阻塞队列。 | LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); |
PriorityBlockingQueue | 无界优先级阻塞队列,按照自然顺序排序元素。 | PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10); |
DelayQueue | 无界阻塞队列,使用优先级队列,元素只有在达到指定的延迟时间后才可用。 | DelayQueue<Delayed> queue = new DelayQueue<>(); |
SynchronousQueue | 不存储元素的阻塞队列,每个插入操作必须等待一个相应的移除操作。 | SynchronousQueue<Integer> queue = new SynchronousQueue<>(); |
通常与线程池结合使用,是自定义线程池时的其中的一个参数,与内置的几种自动创建线程池的对应关系
线程池 | 阻塞队列 |
FixedThreadPool | LinkedBlockingQueue |
SingleThreadExecutor | Linked BlockingQueue |
CachedThreadPool | SynchronousQueue |
ScheduledThreadPool | DelayedWorkQueue |
SingleThreadScheduled Executor | DelayedWorkQueue |
示例代码:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingQueueExample {
public static void main(String[] args) {
// 创建一个有界阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
// 创建一个固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 生产者线程
executor.submit(() -> {
for (int i = 0; i < 10; i++) {
try {
// 将元素添加到队列
queue.put(i);
System.out.println("Produced: " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 消费者线程
executor.submit(() -> {
for (int i = 0; i < 10; i++) {
try {
// 从队列中取出元素
int number = queue.take();
System.out.println("Consumed: " + number);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 关闭线程池
executor.shutdown();
}
}
在这个示例中,我们创建了一个ArrayBlockingQueue
实例,它是一个有界阻塞队列,然后创建了一个线程池。生产者线程向队列中添加数字,而消费者线程从队列中取出数字。由于队列是阻塞的,所以当生产者尝试添加元素到已满的队列时,它将被阻塞,直到队列中有空间。同样,消费者在尝试从一个空队列中取出元素时也会被阻塞。
16.线程局部变量
线程局部变量(ThreadLocal)是一种特殊的变量,每个线程都独立拥有一份副本,线程之间的副本互不影响。ThreadLocal
类提供了线程局部变量的实现,非常适合用于存储那些需要线程隔离的信息,比如事务ID、用户会话等。
类型/类 | 描述 | 示例代码 |
---|---|---|
ThreadLocal | 线程局部变量,为每个线程提供独立的变量副本。 | ThreadLocal<Integer> threadLocal = new ThreadLocal<>(); |
ThreadLocal
的主要方法包括:
void set(T value)
: 为当前线程设置线程局部变量的值。T get()
: 获取当前线程的线程局部变量副本的值。remove()
: 移除当前线程的线程局部变量副本。
示例代码:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadLocalExample {
private static final ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 提交任务到线程池
executorService.submit(() -> {
// 为当前线程设置ThreadLocal的值
threadLocal.set(1);
System.out.println("ThreadLocal value: " + threadLocal.get());
});
executorService.submit(() -> {
// 尝试获取ThreadLocal的值,预期为null,因为每个线程有自己的副本
System.out.println("ThreadLocal value: " + threadLocal.get());
});
// 关闭线程池
executorService.shutdown();
}
}
在这个示例中,我们创建了一个ThreadLocal
实例,并在两个不同的线程中对它进行操作。第一个线程通过调用set()
方法为ThreadLocal
设置了一个值,然后通过调用get()
方法获取并打印了这个值。第二个线程尝试获取ThreadLocal
的值,但由于每个线程拥有自己的副本,它将得到null
。
17.CompletionService
CompletionService
是一个接口,它将任务的异步执行与线程池结合在一起,并提供了一种机制,允许你顺序地获取任务的结果。CompletionService
通常与Executor
(如ThreadPoolExecutor
或ScheduledThreadPoolExecutor
)一起使用,可以按任务提交的顺序检索任务的结果。
方法 | 描述 |
---|---|
Future take() | 检索下一个已完成的任务,如果没有任务完成则阻塞。 |
Future poll(long timeout, TimeUnit unit) | 在给定的时间内等待并检索下一个已完成的任务,如果超时则返回null 。 |
Future poll() | 不阻塞地检索下一个已完成的任务,如果没有任务完成则返回null 。 |
示例代码:
import java.util.concurrent.*;
public class CompletionServiceExample {
public static void main(String[] args) {
// 创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
// 提交任务到CompletionService
completionService.submit(() -> "Task 1 completed");
completionService.submit(() -> "Task 2 completed");
completionService.submit(() -> "Task 3 completed");
// 按顺序获取任务结果
try {
Future<String> future1 = completionService.take(); // 阻塞直到下一个任务完成
System.out.println(future1.get());
Future<String> future2 = completionService.poll(2, TimeUnit.SECONDS); // 2秒超时
if (future2 != null) {
System.out.println(future2.get());
}
Future<String> future3 = completionService.poll();
if (future3 != null) {
System.out.println(future3.get());
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭线程池
executorService.shutdown();
}
}
}
在这个示例中,我们创建了一个ExecutorService
和基于它的CompletionService
。然后,我们提交了三个任务到CompletionService
。使用take()
方法,我们可以阻塞地等待并获取第一个完成的任务的结果。使用poll()
方法,我们可以非阻塞地获取任务结果,或者在指定时间内等待。
请注意,在实际使用中,通常不会像示例中那样直接提交已经完成的任务,而是提交那些需要由ExecutorService
异步执行的任务。这里的示例只是为了展示CompletionService
的使用方法。
具体的实践案例可以参考我的另一篇博客:
使用CompletionService进行多个文件打包为zip下载_hutool zip 打包-CSDN博客
暂且总结这些,后续应该还会再补充一些细节的内容。
原文地址:https://blog.csdn.net/u011174699/article/details/138256471
免责声明:本站文章内容转载自网络资源,如侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!