JDK 并发编程工具类详解:CountDownLatch、Semaphore、Exchanger、CyclicBarrier 和 Phaser
在 Java 并发编程中,JDK 提供了一些强大的工具类来帮助开发者处理线程间的同步和通信问题。这些工具类主要包括 CountDownLatch
、Semaphore
、Exchanger
、CyclicBarrier
和 Phaser
,它们都位于 java.util.concurrent
包下,统称为 JUC(Java Util Concurrent)工具类。本文将详细介绍这些工具类的作用、使用方法和原理。
1 工具类概览
类名 | 作用 |
---|---|
Semaphore | 限制线程的数量,控制并发访问的线程数 |
Exchanger | 两个线程之间交换数据 |
CountDownLatch | 线程等待直到计数器减为 0 时开始工作 |
CyclicBarrier | 类似于 CountDownLatch ,但可以重复使用 |
Phaser | 增强的 CyclicBarrier ,支持动态调整参与线程数和多阶段同步 |
2 Semaphore:信号量
Semaphore
是 Java 并发编程中用于控制并发访问线程数量的工具类。它的名字来源于“信号”(Semaphore),意味着它通过信号来控制线程的访问。Semaphore
维护了一个计数器(permit
),线程在访问资源前需要获取一个 permit
,访问结束后释放 permit
。通过这种方式,Semaphore
可以有效地限制同时访问某个资源的线程数量。
2.1 构造方法
Semaphore
提供了两种构造方法:
- 非公平模式:默认情况下,
Semaphore
是非公平的,即线程获取permit
的顺序不保证是 FIFO(先进先出)。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
- 公平模式:可以通过传入
true
来启用公平模式,确保线程获取permit
的顺序是 FIFO。
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
2.2 主要方法
Semaphore
的主要方法包括:
acquire()
:获取一个permit
。如果没有可用permit
,线程将阻塞直到有permit
可用。release()
:释放一个permit
。acquire(int permits)
:获取多个permit
。release(int permits)
:释放多个permit
。
每次调用 acquire
方法,permit
计数器会减少一个或多个。如果计数器减少到 0,再有线程调用 acquire
,该线程将阻塞,直到有其他线程调用 release
释放 permit
。
2.3 使用案例
Semaphore
常用于资源有限的场景中,限制线程的数量。例如,限制同时只能有 3 个线程在工作:
public class SemaphoreDemo {
static class MyThread implements Runnable {
private int value;
private Semaphore semaphore;
public MyThread(int value, Semaphore semaphore) {
this.value = value;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire(); // 获取 permit
System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",
value, semaphore.availablePermits(), semaphore.getQueueLength()));
// 睡眠随机时间,打乱释放顺序
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("线程%d释放了资源", value));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放 permit
}
}
}
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(new MyThread(i, semaphore)).start();
}
}
}
输出示例:
当前线程是 1, 还剩 2 个资源,还有 0 个线程在等待
当前线程是 0, 还剩 1 个资源,还有 0 个线程在等待
当前线程是 6, 还剩 0 个资源,还有 0 个线程在等待
线程 6 释放了资源
当前线程是 2, 还剩 0 个资源,还有 6 个线程在等待
线程 2 释放了资源
当前线程是 4, 还剩 0 个资源,还有 5 个线程在等待
线程 0 释放了资源
当前线程是 7, 还剩 0 个资源,还有 4 个线程在等待
线程 1 释放了资源
当前线程是 8, 还剩 0 个资源,还有 3 个线程在等待
线程 7 释放了资源
当前线程是 5, 还剩 0 个资源,还有 2 个线程在等待
线程 4 释放了资源
当前线程是 3, 还剩 0 个资源,还有 1 个线程在等待
线程 8 释放了资源
当前线程是 9, 还剩 0 个资源,还有 0 个线程在等待
线程 9 释放了资源
线程 5 释放了资源
线程 3 释放了资源
在这个示例中,最开始是 1, 0, 6 这三个线程获得了资源,其他线程进入了等待队列。当某个线程释放资源后,等待队列中的线程会获得资源。
2.4 其他方法
Semaphore
还提供了一些其他方法,用于处理中断或避免阻塞:
acquireUninterruptibly()
:忽略中断,获取一个permit
。acquireUninterruptibly(int permits)
:忽略中断,获取多个permit
。tryAcquire()
:尝试获取一个permit
,如果成功则返回true
,否则返回false
,不进入阻塞队列。tryAcquire(int permits)
:尝试获取多个permit
。tryAcquire(int permits, long timeout, TimeUnit unit)
:在指定时间内尝试获取多个permit
。tryAcquire(long timeout, TimeUnit unit)
:在指定时间内尝试获取一个permit
。
2.5 原理
Semaphore
内部通过继承 AQS
(AbstractQueuedSynchronizer)的 Sync
类来实现。Sync
类重写了 tryAcquireShared
方法,用于尝试获取资源。如果获取失败(即请求的资源数量小于当前可用资源数量),方法会返回一个负数,表示获取资源失败,当前线程将进入 AQS
的等待队列。
当有线程调用 release
方法释放 permit
时,AQS
会唤醒等待队列中的线程,使其有机会再次尝试获取资源。
3 Exchanger:数据交换器
Exchanger
是 Java 并发编程中用于两个线程之间交换数据的工具类。它支持泛型,因此可以在两个线程之间传送任何类型的数据。Exchanger
的特点是,当一个线程调用 exchange
方法后,会进入阻塞状态,直到另一个线程也调用 exchange
方法,两个线程才会交换数据并继续执行。
3.1 基本使用
以下是一个简单的示例,展示了如何使用 Exchanger
在两个线程之间交换字符串数据:
public class ExchangerDemo {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
System.out.println("这是线程A,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程A的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
Thread.sleep(1000);
new Thread(() -> {
try {
System.out.println("这是线程B,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程B的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
输出示例:
这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据
在这个示例中,线程A调用 exchange
方法后进入阻塞状态,等待线程B调用 exchange
方法。当线程B调用 exchange
方法后,两个线程交换数据并继续执行。
3.2 超时机制
Exchanger
还提供了一个带有超时参数的 exchange
方法,如果在指定时间内没有另一个线程调用 exchange
方法,就会抛出 TimeoutException
异常。
public V exchange(V x, long timeout, TimeUnit unit)
3.3 多线程交换
需要注意的是,Exchanger
只能用于两个线程之间的数据交换。如果有三个线程调用同一个 Exchanger
实例的 exchange
方法,只有前两个线程会交换数据,第三个线程会进入阻塞状态,直到有另一个线程调用 exchange
方法。
3.4 重复使用
Exchanger
是可以重复使用的。也就是说,两个线程可以在内存中不断地交换数据。每次调用 exchange
方法都会进行一次数据交换。
3.5 原理
Exchanger
内部使用 park/unpark
机制来实现线程的等待和唤醒。在使用 park/unpark
方法之前,会通过 CAS
(Compare-And-Swap)检查,以提高性能。CAS
是一种乐观锁机制,用于确保线程安全。
3.6 应用场景
Exchanger
适用于以下场景:
- 基因算法:在基因算法中,两个线程可以交换基因片段。
- 流水线设计:在流水线设计中,两个线程可以交换中间结果。
- 数据同步:在需要两个线程之间同步数据的场景中,可以使用
Exchanger
进行数据交换。
4 CountDownLatch:倒计时门闩
CountDownLatch
是 Java 并发编程中用于线程同步的工具类。它的名字由两部分组成:CountDown
表示计数递减,Latch
表示“门闩”或“屏障”。CountDownLatch
的作用是让一个或多个线程等待其他线程完成一些前置任务,只有当所有前置任务完成后,等待的线程才能继续执行。
4.1 基本方法
CountDownLatch
提供了以下几个主要方法:
-
构造方法:
public CountDownLatch(int count)
构造方法中传入的
count
表示需要等待的前置任务数量。 -
await()
:public void await() throws InterruptedException
调用
await()
方法的线程会进入等待状态,直到count
值减为 0。 -
await(long timeout, TimeUnit unit)
:public boolean await(long timeout, TimeUnit unit) throws InterruptedException
带有超时参数的
await
方法,如果在指定时间内count
没有减为 0,线程会继续执行,并返回false
。 -
countDown()
:public void countDown()
每次调用
countDown()
方法,count
值会减 1。 -
getCount()
:public long getCount()
获取当前
count
的值。
4.2 使用案例
以下是一个模拟游戏加载前置任务的示例:
public class CountDownLatchDemo {
// 定义前置任务线程
static class PreTaskThread implements Runnable {
private String task;
private CountDownLatch countDownLatch;
public PreTaskThread(String task, CountDownLatch countDownLatch) {
this.task = task;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(task + " - 任务完成");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
// 假设有三个模块需要加载
CountDownLatch countDownLatch = new CountDownLatch(3);
// 主任务
new Thread(() -> {
try {
System.out.println("等待数据加载...");
System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
countDownLatch.await();
System.out.println("数据加载完成,正式开始游戏!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
// 前置任务
new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
}
}
输出示例:
等待数据加载...
还有 3 个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!
在这个示例中,主线程会等待所有前置任务完成后,才继续执行游戏开始的逻辑。
4.3 原理
CountDownLatch
的实现原理相对简单,内部通过继承 AQS
(AbstractQueuedSynchronizer)的 Sync
类来实现。Sync
类重写了 tryAcquireShared
方法,用于尝试获取资源。当 count
值减为 0 时,所有等待的线程会被唤醒。
需要注意的是,CountDownLatch
的 count
值在构造时设置,且只能被设置一次。CountDownLatch
没有提供任何机制去重新设置这个计数值。
5 CyclicBarrier:循环屏障
CyclicBarrier
是 Java 并发编程中用于线程同步的工具类。它的名字由两部分组成:Cyclic
表示循环,Barrier
表示屏障。与 CountDownLatch
不同,CyclicBarrier
可以重复使用,并且可以在达到屏障时执行一个任务。
5.1 基本方法
CyclicBarrier
提供了以下几个主要方法:
-
构造方法:
public CyclicBarrier(int parties) public CyclicBarrier(int parties, Runnable barrierAction)
构造方法中传入的
parties
表示需要等待的线程数量。barrierAction
是一个可选的Runnable
对象,当所有线程到达屏障时会执行这个任务。 -
await()
:public int await() throws InterruptedException, BrokenBarrierException
调用
await()
方法的线程会进入等待状态,直到所有线程都调用了await()
方法。 -
await(long timeout, TimeUnit unit)
:public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
带有超时参数的
await
方法,如果在指定时间内没有达到屏障,线程会继续执行,并抛出TimeoutException
异常。 -
reset()
:public void reset()
重置屏障,允许重新使用。
-
isBroken()
:public boolean isBroken()
检查屏障是否被破坏。
5.2 使用案例
以下是一个模拟游戏关卡加载的示例:
public class CyclicBarrierDemo {
static class PreTaskThread implements Runnable {
private String task;
private CyclicBarrier cyclicBarrier;
public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
this.task = task;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
// 假设总共三个关卡
for (int i = 1; i < 4; i++) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d的任务%s完成", i, task));
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("本关卡所有前置任务完成,开始游戏...");
});
new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
}
}
输出示例:
关卡 1 的任务加载地图数据完成
关卡 1 的任务加载背景音乐完成
关卡 1 的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏...
关卡 2 的任务加载地图数据完成
关卡 2 的任务加载背景音乐完成
关卡 2 的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏...
关卡 3 的任务加载人物模型完成
关卡 3 的任务加载地图数据完成
关卡 3 的任务加载背景音乐完成
本关卡所有前置任务完成,开始游戏...
在这个示例中,每个关卡的前置任务完成后,所有线程会等待其他线程也完成任务,然后一起进入下一个关卡。
5.3 原理
CyclicBarrier
的实现原理与 CountDownLatch
不同。CyclicBarrier
内部使用 Lock
和 Condition
实现的等待/通知模式。当所有线程调用 await()
方法后,会触发屏障操作,执行构造方法中传入的 barrierAction
。
CyclicBarrier
的核心方法是 dowait(boolean timed, long nanos)
,它负责处理线程的等待和唤醒逻辑。当所有线程到达屏障时,会执行 barrierAction
,然后重置屏障,允许重新使用。
6 Phaser:阶段同步器
Phaser
是 Java 7 中引入的一个并发同步工具,它提供了对动态数量的线程的同步能力。与 CyclicBarrier
和 CountDownLatch
不同,Phaser
不需要预先知道等待的线程数量,并且支持多阶段的同步操作。Phaser
可以动态调整参与者的数量,并且可以在每个阶段结束时执行特定的任务。
6.1 基本概念
- Party:在
Phaser
的上下文中,一个party
可以是一个线程,也可以是一个任务。当我们在Phaser
上注册一个party
时,Phaser
会递增它的参与者数量。 - Arrive:对应一个
party
的状态,初始时是unarrived
,当调用arriveAndAwaitAdvance()
或者arriveAndDeregister()
进入arrive
状态,可以通过getUnarrivedParties()
获取当前未到达的数量。 - Register:注册一个新的
party
到Phaser
。 - Deregister:减少一个
party
。 - Phase:阶段,当所有注册的
party
都arrive
之后,将会调用Phaser
的onAdvance()
方法来判断是否要进入下一阶段。
6.2 主要方法
-
构造方法:
public Phaser() public Phaser(int parties) public Phaser(Phaser parent) public Phaser(Phaser parent, int parties)
-
register()
:public int register()
注册一个新的
party
。 -
arriveAndAwaitAdvance()
:public int arriveAndAwaitAdvance()
到达并等待其他
party
。 -
arriveAndDeregister()
:public int arriveAndDeregister()
到达并注销当前
party
。 -
getPhase()
:public final int getPhase()
获取当前阶段。
-
onAdvance(int phase, int registeredParties)
:protected boolean onAdvance(int phase, int registeredParties)
在所有
party
到达后执行的任务,返回true
表示终止Phaser
。
6.3 使用案例
以下是一个模拟游戏关卡加载的示例,其中只有第一个关卡需要加载新手教程模块:
public class PhaserDemo {
static class PreTaskThread implements Runnable {
private String task;
private Phaser phaser;
public PreTaskThread(String task, Phaser phaser) {
this.task = task;
this.phaser = phaser;
}
@Override
public void run() {
for (int i = 1; i < 4; i++) {
try {
// 第二次关卡起不加载NPC,跳过
if (i >= 2 && "加载新手教程".equals(task)) {
continue;
}
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】",
i, phaser.getRegisteredParties(), task));
// 从第二个关卡起,不加载NPC
if (i == 1 && "加载新手教程".equals(task)) {
System.out.println("下次关卡移除加载【新手教程】模块");
phaser.arriveAndDeregister(); // 移除一个模块
} else {
phaser.arriveAndAwaitAdvance();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(4) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("第%d次关卡准备完成", phase + 1));
return phase == 3 || registeredParties == 0;
}
};
new Thread(new PreTaskThread("加载地图数据", phaser)).start();
new Thread(new PreTaskThread("加载人物模型", phaser)).start();
new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
new Thread(new PreTaskThread("加载新手教程", phaser)).start();
}
}
输出示例:
关卡 1,需要加载 4 个模块,当前模块【加载背景音乐】
关卡 1,需要加载 4 个模块,当前模块【加载新手教程】
下次关卡移除加载【新手教程】模块
关卡 1,需要加载 3 个模块,当前模块【加载地图数据】
关卡 1,需要加载 3 个模块,当前模块【加载人物模型】
第 1 次关卡准备完成
关卡 2,需要加载 3 个模块,当前模块【加载地图数据】
关卡 2,需要加载 3 个模块,当前模块【加载背景音乐】
关卡 2,需要加载 3 个模块,当前模块【加载人物模型】
第 2 次关卡准备完成
关卡 3,需要加载 3 个模块,当前模块【加载人物模型】
关卡 3,需要加载 3 个模块,当前模块【加载地图数据】
关卡 3,需要加载 3 个模块,当前模块【加载背景音乐】
第 3 次关卡准备完成
在这个示例中,Phaser
动态调整了参与者的数量,并且在每个阶段结束时执行了特定的任务。
6.4 原理
Phaser
的实现原理相对复杂,内部使用了两个基于 Fork-Join
框架的原子类来辅助实现:
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;
static final class QNode implements ForkJoinPool.ManagedBlocker {
// 实现代码
}
Phaser
通过这些原子类来管理参与者的状态和阶段的转换。当所有参与者到达当前阶段时,会调用 onAdvance()
方法,判断是否进入下一阶段。
7 小结
JDK 提供的这些并发工具类各有特点,适用于不同的场景:
Semaphore
适用于限制并发访问的线程数。Exchanger
适用于两个线程之间的数据交换。CountDownLatch
适用于等待多个线程完成任务后再继续执行。CyclicBarrier
适用于多个线程在某个点上相互等待,并且可以重复使用。Phaser
是CyclicBarrier
的增强版,支持动态调整参与线程数和多阶段同步。
根据实际需求选择合适的工具类,可以大大简化并发编程的复杂度。
8 思维导图
9 参考链接
Java并发编程通信工具类 Semaphore、Exchanger、CountDownLatch、CyclicBarrier、Phaser等一网打尽
原文地址:https://blog.csdn.net/gaosw0521/article/details/144154292
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!