自学内容网 自学内容网

Java 编码系列:并发工具类详解

引言

在多线程编程中,除了基本的同步机制外,Java 还提供了丰富的并发工具类,这些工具类可以帮助开发者更高效地管理和协调多线程任务。本文将详细介绍 Executor 框架、CountDownLatchCyclicBarrierSemaphore 等并发工具类的工作原理、使用方法,并结合大厂的最佳实践和底层核心原理,帮助读者深入理解这些工具类的应用。

1. Executor 框架
1.1 基本概念

Executor 框架是 Java 并发库中的一个重要组成部分,它提供了一种将任务提交与任务执行解耦的方式。通过 Executor 框架,可以方便地管理和调度线程,提高程序的性能和可维护性。

1.2 主要接口和实现
  • Executor 接口:最基本的接口,定义了 execute(Runnable command) 方法,用于执行任务。
  • ExecutorService 接口:继承自 Executor 接口,提供了更丰富的功能,如任务提交、关闭和生命周期管理。
  • ThreadPoolExecutor 类ExecutorService 的一个具体实现,提供了线程池的功能。
  • ScheduledExecutorService 接口:继承自 ExecutorService 接口,提供了定时任务和周期任务的执行功能。
  • ScheduledThreadPoolExecutor 类ScheduledExecutorService 的一个具体实现。
1.3 使用方法
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 10; i++) {
            int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 开始执行,线程名: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskId + " 执行完毕");
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}
1.4 底层原理
  • 线程池ThreadPoolExecutor 是 ExecutorService 的主要实现类,它通过维护一个线程池来管理线程的创建和销毁。
  • 工作队列:任务提交后会被放入一个工作队列中,线程池中的空闲线程会从队列中取出任务并执行。
  • 拒绝策略:当线程池中的线程数量达到上限且工作队列已满时,新的任务会被拒绝。ThreadPoolExecutor 提供了多种拒绝策略,如 AbortPolicyCallerRunsPolicy 等。
1.5 优缺点
  • 优点
    • 提高了线程的复用率,减少了线程创建和销毁的开销。
    • 提供了丰富的配置选项,可以根据业务需求灵活调整线程池的大小和行为。
    • 支持任务的异步执行和结果返回。
  • 缺点
    • 配置复杂,需要合理设置线程池的参数,否则可能导致性能问题。
2. CountDownLatch
2.1 基本概念

CountDownLatch 是一个同步辅助类,允许一个或多个线程等待其他线程完成操作后再继续执行。它通过一个计数器来实现这一功能,当计数器的值为0时,所有等待的线程将被释放。

2.2 使用方法
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int numberOfThreads = 3;
        CountDownLatch latch = new CountDownLatch(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            int taskId = i;
            new Thread(() -> {
                System.out.println("任务 " + taskId + " 开始执行,线程名: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskId + " 执行完毕");
                latch.countDown(); // 减少计数器的值
            }).start();
        }

        latch.await(); // 等待所有任务完成
        System.out.println("所有任务已完成");
    }
}
2.3 底层原理
  • AQS(AbstractQueuedSynchronizer)CountDownLatch 的实现基于 AQS,通过一个 volatile int 变量来管理计数器的值。
  • 同步队列:当计数器的值大于0时,调用 await 方法的线程会被阻塞并加入同步队列,直到计数器的值变为0。
2.4 优缺点
  • 优点
    • 简单易用,适用于一个或多个线程等待其他线程完成任务的场景。
    • 支持超时等待,可以通过 await(long timeout, TimeUnit unit) 方法指定等待时间。
  • 缺点
    • 计数器的值只能递减,不能重置,如果需要重置计数器,需要创建新的 CountDownLatch 实例。
3. CyclicBarrier
3.1 基本概念

CyclicBarrier 是一个同步辅助类,允许一组线程互相等待,直到所有线程都到达一个屏障点。与 CountDownLatch 不同,CyclicBarrier 的计数器可以重置,因此可以重复使用。

3.2 使用方法
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int numberOfThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
            System.out.println("所有任务已完成,执行额外操作");
        });

        for (int i = 0; i < numberOfThreads; i++) {
            int taskId = i;
            new Thread(() -> {
                System.out.println("任务 " + taskId + " 开始执行,线程名: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskId + " 执行完毕,等待其他任务");
                try {
                    barrier.await(); // 等待其他任务
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskId + " 继续执行");
            }).start();
        }
    }
}
3.3 底层原理
  • AQS(AbstractQueuedSynchronizer)CyclicBarrier 的实现也基于 AQS,通过一个 volatile int 变量来管理计数器的值。
  • 同步队列:当计数器的值大于0时,调用 await 方法的线程会被阻塞并加入同步队列,直到计数器的值变为0。
3.4 优缺点
  • 优点
    • 支持计数器的重置,可以重复使用。
    • 支持屏障动作,可以在所有线程到达屏障点后执行额外的操作。
  • 缺点
    • 如果某个线程在等待过程中被中断或发生异常,会导致其他线程无法继续执行,需要处理 BrokenBarrierException
4. Semaphore
4.1 基本概念

Semaphore 是一个同步辅助类,用于控制同时访问特定资源的线程数量。它通过一个计数器来实现这一功能,当计数器的值大于0时,线程可以获取许可;否则,线程将被阻塞,直到有其他线程释放许可。

4.2 使用方法
import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    private final Semaphore semaphore = new Semaphore(3); // 最多允许3个线程同时访问

    public void accessResource() {
        try {
            semaphore.acquire();
            System.out.println("线程 " + Thread.currentThread().getName() + " 访问资源");
            Thread.sleep(1000); // 模拟资源访问时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }

    public static void main(String[] args) {
        SemaphoreExample example = new SemaphoreExample();

        for (int i = 0; i < 10; i++) {
            int taskId = i;
            new Thread(() -> {
                example.accessResource();
            }, "Thread-" + taskId).start();
        }
    }
}
4.3 底层原理
  • AQS(AbstractQueuedSynchronizer)Semaphore 的实现也基于 AQS,通过一个 volatile int 变量来管理许可的数量。
  • 同步队列:当许可的数量大于0时,调用 acquire 方法的线程可以获取许可;否则,线程将被阻塞并加入同步队列,直到有其他线程释放许可。
4.4 优缺点
  • 优点
    • 控制同时访问资源的线程数量,避免资源过载。
    • 支持公平和非公平模式,可以根据业务需求选择合适的模式。
  • 缺点
    • 需要手动管理许可的获取和释放,容易出现遗漏。
5. 大厂最佳实践
5.1 阿里巴巴《Java开发手册》
  • 线程池配置:合理配置线程池的大小,避免线程过多导致系统资源耗尽。
  • 任务提交:使用 submit 方法提交任务,以便获取任务的执行结果。
  • 资源管理:使用 try-with-resources 语句管理资源,确保资源在使用后正确释放。
5.2 Google Java Style Guide
  • 异常处理:合理处理线程中的异常,避免未捕获的异常导致线程终止。
  • 资源管理:确保在多线程环境中正确使用锁,避免死锁和数据不一致。
  • 性能优化:合理使用并发工具类,避免过度同步导致性能下降。
5.3 Oracle 官方文档
  • 线程池:根据业务需求选择合适的线程池类型,如固定大小线程池、缓存线程池等。
  • 同步辅助类:合理使用 CountDownLatchCyclicBarrier 和 Semaphore 等同步辅助类,避免多线程环境下的数据不一致和死锁问题。
6. 底层核心原理
6.1 AQS(AbstractQueuedSynchronizer)
  • 状态管理:AQS 使用一个 volatile int state 变量来管理同步状态。
  • FIFO 队列:AQS 使用一个 FIFO 队列来管理等待获取锁或许可的线程。
  • 自定义同步器:AQS 提供了 acquire 和 release 方法,子类可以通过实现这些方法来自定义同步器。
6.2 线程池
  • 线程创建:线程池在初始化时创建一定数量的线程,这些线程会在空闲时等待任务。
  • 任务提交:任务提交后会被放入一个工作队列中,线程池中的空闲线程会从队列中取出任务并执行。
  • 线程回收:当线程池中的线程数量超过核心线程数且空闲时间超过设定值时,多余的线程将被回收。
6.3 同步辅助类
  • 计数器管理CountDownLatch 和 CyclicBarrier 通过一个 volatile int 变量来管理计数器的值。
  • 同步队列:当计数器的值大于0时,调用 await 方法的线程会被阻塞并加入同步队列,直到计数器的值变为0。
  • 许可管理Semaphore 通过一个 volatile int 变量来管理许可的数量,当许可的数量大于0时,调用 acquire 方法的线程可以获取许可;否则,线程将被阻塞并加入同步队列。
7. 示例代码
7.1 Executor 框架
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExecutorExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(4);

        for (int i = 0; i < 10; i++) {
            int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 开始执行,线程名: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskId + " 执行完毕");
            });
        }

        // 关闭线程池
        executor.shutdown();
    }
}
7.2 CountDownLatch
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int numberOfThreads = 3;
        CountDownLatch latch = new CountDownLatch(numberOfThreads);

        for (int i = 0; i < numberOfThreads; i++) {
            int taskId = i;
            new Thread(() -> {
                System.out.println("任务 " + taskId + " 开始执行,线程名: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskId + " 执行完毕");
                latch.countDown(); // 减少计数器的值
            }).start();
        }

        latch.await(); // 等待所有任务完成
        System.out.println("所有任务已完成");
    }
}
7.3 CyclicBarrier
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int numberOfThreads = 3;
        CyclicBarrier barrier = new CyclicBarrier(numberOfThreads, () -> {
            System.out.println("所有任务已完成,执行额外操作");
        });

        for (int i = 0; i < numberOfThreads; i++) {
            int taskId = i;
            new Thread(() -> {
                System.out.println("任务 " + taskId + " 开始执行,线程名: " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行时间
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskId + " 执行完毕,等待其他任务");
                try {
                    barrier.await(); // 等待其他任务
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("任务 " + taskId + " 继续执行");
            }).start();
        }
    }
}
7.4 Semaphore
import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    private final Semaphore semaphore = new Semaphore(3); // 最多允许3个线程同时访问

    public void accessResource() {
        try {
            semaphore.acquire();
            System.out.println("线程 " + Thread.currentThread().getName() + " 访问资源");
            Thread.sleep(1000); // 模拟资源访问时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
        }
    }

    public static void main(String[] args) {
        SemaphoreExample example = new SemaphoreExample();

        for (int i = 0; i < 10; i++) {
            int taskId = i;
            new Thread(() -> {
                example.accessResource();
            }, "Thread-" + taskId).start();
        }
    }
}
8. 总结

本文详细介绍了 Java 并发编程中的 Executor 框架、CountDownLatchCyclicBarrierSemaphore 等并发工具类的工作原理、使用方法,并结合大厂的最佳实践和底层核心原理,帮助读者深入理解这些工具类的应用。合理地使用并发工具类可以提高程序的性能和可靠性,避免多线程环境下的数据不一致和死锁问题。希望本文对你有所帮助,如果你有任何问题或建议,欢迎留言交流。


希望这篇文章能够满足你的需求,如果有任何进一步的问题或需要更多内容,请随时告诉我!


原文地址:https://blog.csdn.net/pjx987/article/details/142567843

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!