自学内容网 自学内容网

JDK 并发编程工具类详解:CountDownLatch、Semaphore、Exchanger、CyclicBarrier 和 Phaser

在 Java 并发编程中,JDK 提供了一些强大的工具类来帮助开发者处理线程间的同步和通信问题。这些工具类主要包括 CountDownLatchSemaphoreExchangerCyclicBarrierPhaser,它们都位于 java.util.concurrent 包下,统称为 JUC(Java Util Concurrent)工具类。本文将详细介绍这些工具类的作用、使用方法和原理。

1 工具类概览

类名作用
Semaphore限制线程的数量,控制并发访问的线程数
Exchanger两个线程之间交换数据
CountDownLatch线程等待直到计数器减为 0 时开始工作
CyclicBarrier类似于 CountDownLatch,但可以重复使用
Phaser增强的 CyclicBarrier,支持动态调整参与线程数和多阶段同步

2 Semaphore:信号量

Semaphore 是 Java 并发编程中用于控制并发访问线程数量的工具类。它的名字来源于“信号”(Semaphore),意味着它通过信号来控制线程的访问。Semaphore 维护了一个计数器(permit),线程在访问资源前需要获取一个 permit,访问结束后释放 permit。通过这种方式,Semaphore 可以有效地限制同时访问某个资源的线程数量。

2.1 构造方法

Semaphore 提供了两种构造方法:

  • 非公平模式:默认情况下,Semaphore 是非公平的,即线程获取 permit 的顺序不保证是 FIFO(先进先出)。
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
  • 公平模式:可以通过传入 true 来启用公平模式,确保线程获取 permit 的顺序是 FIFO。
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

2.2 主要方法

Semaphore 的主要方法包括:

  • acquire():获取一个 permit。如果没有可用 permit,线程将阻塞直到有 permit 可用。
  • release():释放一个 permit
  • acquire(int permits):获取多个 permit
  • release(int permits):释放多个 permit

每次调用 acquire 方法,permit 计数器会减少一个或多个。如果计数器减少到 0,再有线程调用 acquire,该线程将阻塞,直到有其他线程调用 release 释放 permit

2.3 使用案例

Semaphore 常用于资源有限的场景中,限制线程的数量。例如,限制同时只能有 3 个线程在工作:

public class SemaphoreDemo {
    static class MyThread implements Runnable {
        private int value;
        private Semaphore semaphore;

        public MyThread(int value, Semaphore semaphore) {
            this.value = value;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire(); // 获取 permit
                System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",
                        value, semaphore.availablePermits(), semaphore.getQueueLength()));
                // 睡眠随机时间,打乱释放顺序
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(String.format("线程%d释放了资源", value));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release(); // 释放 permit
            }
        }
    }

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            new Thread(new MyThread(i, semaphore)).start();
        }
    }
}

输出示例:

当前线程是 1, 还剩 2 个资源,还有 0 个线程在等待
当前线程是 0, 还剩 1 个资源,还有 0 个线程在等待
当前线程是 6, 还剩 0 个资源,还有 0 个线程在等待
线程 6 释放了资源
当前线程是 2, 还剩 0 个资源,还有 6 个线程在等待
线程 2 释放了资源
当前线程是 4, 还剩 0 个资源,还有 5 个线程在等待
线程 0 释放了资源
当前线程是 7, 还剩 0 个资源,还有 4 个线程在等待
线程 1 释放了资源
当前线程是 8, 还剩 0 个资源,还有 3 个线程在等待
线程 7 释放了资源
当前线程是 5, 还剩 0 个资源,还有 2 个线程在等待
线程 4 释放了资源
当前线程是 3, 还剩 0 个资源,还有 1 个线程在等待
线程 8 释放了资源
当前线程是 9, 还剩 0 个资源,还有 0 个线程在等待
线程 9 释放了资源
线程 5 释放了资源
线程 3 释放了资源

在这个示例中,最开始是 1, 0, 6 这三个线程获得了资源,其他线程进入了等待队列。当某个线程释放资源后,等待队列中的线程会获得资源。

2.4 其他方法

Semaphore 还提供了一些其他方法,用于处理中断或避免阻塞:

  • acquireUninterruptibly():忽略中断,获取一个 permit
  • acquireUninterruptibly(int permits):忽略中断,获取多个 permit
  • tryAcquire():尝试获取一个 permit,如果成功则返回 true,否则返回 false,不进入阻塞队列。
  • tryAcquire(int permits):尝试获取多个 permit
  • tryAcquire(int permits, long timeout, TimeUnit unit):在指定时间内尝试获取多个 permit
  • tryAcquire(long timeout, TimeUnit unit):在指定时间内尝试获取一个 permit

2.5 原理

Semaphore 内部通过继承 AQS(AbstractQueuedSynchronizer)的 Sync 类来实现。Sync 类重写了 tryAcquireShared 方法,用于尝试获取资源。如果获取失败(即请求的资源数量小于当前可用资源数量),方法会返回一个负数,表示获取资源失败,当前线程将进入 AQS 的等待队列。

当有线程调用 release 方法释放 permit 时,AQS 会唤醒等待队列中的线程,使其有机会再次尝试获取资源。

3 Exchanger:数据交换器

Exchanger 是 Java 并发编程中用于两个线程之间交换数据的工具类。它支持泛型,因此可以在两个线程之间传送任何类型的数据。Exchanger 的特点是,当一个线程调用 exchange 方法后,会进入阻塞状态,直到另一个线程也调用 exchange 方法,两个线程才会交换数据并继续执行。

3.1 基本使用

以下是一个简单的示例,展示了如何使用 Exchanger 在两个线程之间交换字符串数据:

public class ExchangerDemo {
    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                System.out.println("这是线程A,得到了另一个线程的数据:"
                        + exchanger.exchange("这是来自线程A的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
        Thread.sleep(1000);

        new Thread(() -> {
            try {
                System.out.println("这是线程B,得到了另一个线程的数据:"
                        + exchanger.exchange("这是来自线程B的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

输出示例:

这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据

在这个示例中,线程A调用 exchange 方法后进入阻塞状态,等待线程B调用 exchange 方法。当线程B调用 exchange 方法后,两个线程交换数据并继续执行。

3.2 超时机制

Exchanger 还提供了一个带有超时参数的 exchange 方法,如果在指定时间内没有另一个线程调用 exchange 方法,就会抛出 TimeoutException 异常。

public V exchange(V x, long timeout, TimeUnit unit)

3.3 多线程交换

需要注意的是,Exchanger 只能用于两个线程之间的数据交换。如果有三个线程调用同一个 Exchanger 实例的 exchange 方法,只有前两个线程会交换数据,第三个线程会进入阻塞状态,直到有另一个线程调用 exchange 方法。

3.4 重复使用

Exchanger 是可以重复使用的。也就是说,两个线程可以在内存中不断地交换数据。每次调用 exchange 方法都会进行一次数据交换。

3.5 原理

Exchanger 内部使用 park/unpark 机制来实现线程的等待和唤醒。在使用 park/unpark 方法之前,会通过 CAS(Compare-And-Swap)检查,以提高性能。CAS 是一种乐观锁机制,用于确保线程安全。

3.6 应用场景

Exchanger 适用于以下场景:

  • 基因算法:在基因算法中,两个线程可以交换基因片段。
  • 流水线设计:在流水线设计中,两个线程可以交换中间结果。
  • 数据同步:在需要两个线程之间同步数据的场景中,可以使用 Exchanger 进行数据交换。

4 CountDownLatch:倒计时门闩

CountDownLatch 是 Java 并发编程中用于线程同步的工具类。它的名字由两部分组成:CountDown 表示计数递减,Latch 表示“门闩”或“屏障”。CountDownLatch 的作用是让一个或多个线程等待其他线程完成一些前置任务,只有当所有前置任务完成后,等待的线程才能继续执行。

4.1 基本方法

CountDownLatch 提供了以下几个主要方法:

  • 构造方法

    public CountDownLatch(int count)
    

    构造方法中传入的 count 表示需要等待的前置任务数量。

  • await()

    public void await() throws InterruptedException
    

    调用 await() 方法的线程会进入等待状态,直到 count 值减为 0。

  • await(long timeout, TimeUnit unit)

    public boolean await(long timeout, TimeUnit unit) throws InterruptedException
    

    带有超时参数的 await 方法,如果在指定时间内 count 没有减为 0,线程会继续执行,并返回 false

  • countDown()

    public void countDown()
    

    每次调用 countDown() 方法,count 值会减 1。

  • getCount()

    public long getCount()
    

    获取当前 count 的值。

4.2 使用案例

以下是一个模拟游戏加载前置任务的示例:

public class CountDownLatchDemo {
    // 定义前置任务线程
    static class PreTaskThread implements Runnable {
        private String task;
        private CountDownLatch countDownLatch;

        public PreTaskThread(String task, CountDownLatch countDownLatch) {
            this.task = task;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {
            try {
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(task + " - 任务完成");
                countDownLatch.countDown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        // 假设有三个模块需要加载
        CountDownLatch countDownLatch = new CountDownLatch(3);

        // 主任务
        new Thread(() -> {
            try {
                System.out.println("等待数据加载...");
                System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
                countDownLatch.await();
                System.out.println("数据加载完成,正式开始游戏!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 前置任务
        new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
        new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
        new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
    }
}

输出示例:

等待数据加载...
还有 3 个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!

在这个示例中,主线程会等待所有前置任务完成后,才继续执行游戏开始的逻辑。

4.3 原理

CountDownLatch 的实现原理相对简单,内部通过继承 AQS(AbstractQueuedSynchronizer)的 Sync 类来实现。Sync 类重写了 tryAcquireShared 方法,用于尝试获取资源。当 count 值减为 0 时,所有等待的线程会被唤醒。

需要注意的是,CountDownLatchcount 值在构造时设置,且只能被设置一次。CountDownLatch 没有提供任何机制去重新设置这个计数值。

5 CyclicBarrier:循环屏障

CyclicBarrier 是 Java 并发编程中用于线程同步的工具类。它的名字由两部分组成:Cyclic 表示循环,Barrier 表示屏障。与 CountDownLatch 不同,CyclicBarrier 可以重复使用,并且可以在达到屏障时执行一个任务。

5.1 基本方法

CyclicBarrier 提供了以下几个主要方法:

  • 构造方法

    public CyclicBarrier(int parties)
    public CyclicBarrier(int parties, Runnable barrierAction)
    

    构造方法中传入的 parties 表示需要等待的线程数量。barrierAction 是一个可选的 Runnable 对象,当所有线程到达屏障时会执行这个任务。

  • await()

    public int await() throws InterruptedException, BrokenBarrierException
    

    调用 await() 方法的线程会进入等待状态,直到所有线程都调用了 await() 方法。

  • await(long timeout, TimeUnit unit)

    public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException
    

    带有超时参数的 await 方法,如果在指定时间内没有达到屏障,线程会继续执行,并抛出 TimeoutException 异常。

  • reset()

    public void reset()
    

    重置屏障,允许重新使用。

  • isBroken()

    public boolean isBroken()
    

    检查屏障是否被破坏。

5.2 使用案例

以下是一个模拟游戏关卡加载的示例:

public class CyclicBarrierDemo {
    static class PreTaskThread implements Runnable {
        private String task;
        private CyclicBarrier cyclicBarrier;

        public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
            this.task = task;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            // 假设总共三个关卡
            for (int i = 1; i < 4; i++) {
                try {
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d的任务%s完成", i, task));
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
            System.out.println("本关卡所有前置任务完成,开始游戏...");
        });

        new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
        new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
    }
}

输出示例:

关卡 1 的任务加载地图数据完成
关卡 1 的任务加载背景音乐完成
关卡 1 的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏...
关卡 2 的任务加载地图数据完成
关卡 2 的任务加载背景音乐完成
关卡 2 的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏...
关卡 3 的任务加载人物模型完成
关卡 3 的任务加载地图数据完成
关卡 3 的任务加载背景音乐完成
本关卡所有前置任务完成,开始游戏...

在这个示例中,每个关卡的前置任务完成后,所有线程会等待其他线程也完成任务,然后一起进入下一个关卡。

5.3 原理

CyclicBarrier 的实现原理与 CountDownLatch 不同。CyclicBarrier 内部使用 LockCondition 实现的等待/通知模式。当所有线程调用 await() 方法后,会触发屏障操作,执行构造方法中传入的 barrierAction

CyclicBarrier 的核心方法是 dowait(boolean timed, long nanos),它负责处理线程的等待和唤醒逻辑。当所有线程到达屏障时,会执行 barrierAction,然后重置屏障,允许重新使用。

6 Phaser:阶段同步器

Phaser 是 Java 7 中引入的一个并发同步工具,它提供了对动态数量的线程的同步能力。与 CyclicBarrierCountDownLatch 不同,Phaser 不需要预先知道等待的线程数量,并且支持多阶段的同步操作。Phaser 可以动态调整参与者的数量,并且可以在每个阶段结束时执行特定的任务。

6.1 基本概念

  • Party:在 Phaser 的上下文中,一个 party 可以是一个线程,也可以是一个任务。当我们在 Phaser 上注册一个 party 时,Phaser 会递增它的参与者数量。
  • Arrive:对应一个 party 的状态,初始时是 unarrived,当调用 arriveAndAwaitAdvance() 或者 arriveAndDeregister() 进入 arrive 状态,可以通过 getUnarrivedParties() 获取当前未到达的数量。
  • Register:注册一个新的 partyPhaser
  • Deregister:减少一个 party
  • Phase:阶段,当所有注册的 partyarrive 之后,将会调用 PhaseronAdvance() 方法来判断是否要进入下一阶段。

6.2 主要方法

  • 构造方法

    public Phaser()
    public Phaser(int parties)
    public Phaser(Phaser parent)
    public Phaser(Phaser parent, int parties)
    
  • register()

    public int register()
    

    注册一个新的 party

  • arriveAndAwaitAdvance()

    public int arriveAndAwaitAdvance()
    

    到达并等待其他 party

  • arriveAndDeregister()

    public int arriveAndDeregister()
    

    到达并注销当前 party

  • getPhase()

    public final int getPhase()
    

    获取当前阶段。

  • onAdvance(int phase, int registeredParties)

    protected boolean onAdvance(int phase, int registeredParties)
    

    在所有 party 到达后执行的任务,返回 true 表示终止 Phaser

6.3 使用案例

以下是一个模拟游戏关卡加载的示例,其中只有第一个关卡需要加载新手教程模块:

public class PhaserDemo {
    static class PreTaskThread implements Runnable {
        private String task;
        private Phaser phaser;

        public PreTaskThread(String task, Phaser phaser) {
            this.task = task;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            for (int i = 1; i < 4; i++) {
                try {
                    // 第二次关卡起不加载NPC,跳过
                    if (i >= 2 && "加载新手教程".equals(task)) {
                        continue;
                    }
                    Random random = new Random();
                    Thread.sleep(random.nextInt(1000));
                    System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】",
                            i, phaser.getRegisteredParties(), task));

                    // 从第二个关卡起,不加载NPC
                    if (i == 1 && "加载新手教程".equals(task)) {
                        System.out.println("下次关卡移除加载【新手教程】模块");
                        phaser.arriveAndDeregister(); // 移除一个模块
                    } else {
                        phaser.arriveAndAwaitAdvance();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        Phaser phaser = new Phaser(4) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println(String.format("第%d次关卡准备完成", phase + 1));
                return phase == 3 || registeredParties == 0;
            }
        };

        new Thread(new PreTaskThread("加载地图数据", phaser)).start();
        new Thread(new PreTaskThread("加载人物模型", phaser)).start();
        new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
        new Thread(new PreTaskThread("加载新手教程", phaser)).start();
    }
}

输出示例:

关卡 1,需要加载 4 个模块,当前模块【加载背景音乐】
关卡 1,需要加载 4 个模块,当前模块【加载新手教程】
下次关卡移除加载【新手教程】模块
关卡 1,需要加载 3 个模块,当前模块【加载地图数据】
关卡 1,需要加载 3 个模块,当前模块【加载人物模型】
第 1 次关卡准备完成
关卡 2,需要加载 3 个模块,当前模块【加载地图数据】
关卡 2,需要加载 3 个模块,当前模块【加载背景音乐】
关卡 2,需要加载 3 个模块,当前模块【加载人物模型】
第 2 次关卡准备完成
关卡 3,需要加载 3 个模块,当前模块【加载人物模型】
关卡 3,需要加载 3 个模块,当前模块【加载地图数据】
关卡 3,需要加载 3 个模块,当前模块【加载背景音乐】
第 3 次关卡准备完成

在这个示例中,Phaser 动态调整了参与者的数量,并且在每个阶段结束时执行了特定的任务。

6.4 原理

Phaser 的实现原理相对复杂,内部使用了两个基于 Fork-Join 框架的原子类来辅助实现:

private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

static final class QNode implements ForkJoinPool.ManagedBlocker {
    // 实现代码
}

Phaser 通过这些原子类来管理参与者的状态和阶段的转换。当所有参与者到达当前阶段时,会调用 onAdvance() 方法,判断是否进入下一阶段。

7 小结

JDK 提供的这些并发工具类各有特点,适用于不同的场景:

  • Semaphore 适用于限制并发访问的线程数。
  • Exchanger 适用于两个线程之间的数据交换。
  • CountDownLatch 适用于等待多个线程完成任务后再继续执行。
  • CyclicBarrier 适用于多个线程在某个点上相互等待,并且可以重复使用。
  • PhaserCyclicBarrier 的增强版,支持动态调整参与线程数和多阶段同步。

根据实际需求选择合适的工具类,可以大大简化并发编程的复杂度。

8 思维导图

在这里插入图片描述

9 参考链接

Java并发编程通信工具类 Semaphore、Exchanger、CountDownLatch、CyclicBarrier、Phaser等一网打尽


原文地址:https://blog.csdn.net/gaosw0521/article/details/144154292

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!