自学内容网 自学内容网

Java并发知识(JUC包)表格总结

     Java的java.util.concurrent(JUC)包是Java并发编程的核心包,提供了广泛的并发工具,用于创建线程安全的程序。表格总结的可能没有面面俱到,不过常用的一些基本上会总结到的,在项目实践中可以用,主要是作为复习之用。

目录

1. Executor框架

2.同步器

3.锁框架

4.原子变量

5.并发集合

6.线程池

7.Future和Callable接口

8.同步队列(BlockingQueue)

9.条件对象(Condition)

10.Fork/Join框架

11.CompletableFuture异步编程

12.Phaser多线程协调

13.StampedLock读写锁

14.LongAdder和DoubleAdder原子累加器

15.阻塞队列

16.线程局部变量

17.CompletionService


1. Executor框架

组件描述示例代码
Executor顶层接口,用于将任务提交给线程池执行。Executor executor = Executors.newFixedThreadPool(3);
ExecutorService接口,扩展了Executor,提供了管理线程池的方法。ExecutorService service = Executors.newFixedThreadPool(3);
Executors工厂类,用于创建预定义配置的线程池。Executors.newFixedThreadPool(3);
ThreadPoolExecutor可定制的线程池实现。new ThreadPoolExecutor(...);
ScheduledExecutorService用于延迟执行或定期执行任务的线程池接口。ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
Future表示异步计算的结果,可用于查询计算是否完成,等待计算结果。Future<?> future = executor.submit(()->{/* task */});

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交任务给线程池
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            executor.submit(() -> {
                System.out.println("Task " + finalI + " is running.");
            });
        }
        
        // 关闭线程池,不再接受新任务,已提交的任务将完成后关闭
        executor.shutdown();
        
        try {
            // 等待线程池关闭,最多等待1分钟
            if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
                // 如果超时,则强制关闭
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            // 线程被中断时的处理
            executor.shutdownNow();
        }
    }
}

 注意:这时示例代码,在项目实践中不建议直接使用Executors.newFixedThreadPool(3)这样的自动方式创建线程池,因为会有潜在的OOM的风险,而是推荐自定义线程池

 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("test-async-%d").build();
        ExecutorService executor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024),
                namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
2.同步器

同步器是用于控制并发任务之间协调的组件,它们可以用于同步线程的执行,确保线程安全,以及实现复杂的线程间协作。

类型/接口描述示例代码
CountDownLatch一个同步辅助类,允许一个或多个线程等待一组操作完成。CountDownLatch latch = new CountDownLatch(1);
CyclicBarrier类似于CountDownLatch,但可以重复使用。CyclicBarrier barrier = new CyclicBarrier(3);
Semaphore一个计数信号量,用于控制同时访问特定资源的线程数量。Semaphore semaphore = new Semaphore(3);
Exchanger一个同步辅助类,用于两个线程之间的数据交换。Exchanger<String> exchanger = new Exchanger<>();
Phaser一个更灵活的同步辅助类,用于线程之间的协调。Phaser phaser = new Phaser();

 示例代码:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SynchronizerExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 创建一个CyclicBarrier,用于3个线程之间的同步
        CyclicBarrier barrier = new CyclicBarrier(3);
        
        // 提交任务给线程池
        for (int i = 0; i < 3; i++) {
            final int taskNumber = i;
            executor.execute(() -> {
                System.out.println("Thread " + taskNumber + " is running.");
                try {
                    // 当一个线程到达屏障点时,等待其他线程
                    barrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("Thread " + taskNumber + " has passed the barrier.");
            });
        }
        
        // 关闭线程池
        executor.shutdown();
    }
}

这个示例展示了如何使用CyclicBarrier来同步多个线程,确保它们在某个点上都达到了同步状态。每个线程在执行到屏障点时会调用barrier.await(),这会导致线程阻塞,直到所有线程都到达屏障点,然后它们会同时继续执行

3.锁框架

在多线程环境中,锁是确保线程安全的关键机制。JUC包提供了多种锁的实现,用于控制对共享资源的访问。

锁类型描述示例代码
Lock顶层接口,提供了比synchronized更灵活的锁操作。Lock lock = new ReentrantLock();
ReentrantLock可重入锁,实现了Lock接口,提供了公平和非公平锁的选项。ReentrantLock lock = new ReentrantLock(true);
ReadWriteLock读写锁,允许多个读操作同时进行,但写操作是排他的。ReadWriteLock rwLock = new ReentrantReadWriteLock();
StampedLock一种新型的读写锁,相比ReadWriteLock,它提供了更多的灵活性。StampedLock stampedLock = new StampedLock();

示例代码:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockExample {
    private static final Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        lock.lock(); // 获取锁
        try {
            // 临界区:在这个区域内,只有当前线程可以执行
            System.out.println("Critical section: only one thread can execute here.");
        } finally {
            lock.unlock(); // 释放锁
        }
    }
}

在这个示例中,ReentrantLock被用来创建一个锁,以确保临界区内的代码只能由一个线程执行。lock()方法用于获取锁,而unlock()方法用于释放锁。注意,示例中使用了try-finally结构来确保即使在发生异常的情况下,锁也能被正确释放。

4.原子变量

原子变量是JUC包中提供的一种线程安全的变量,它们利用了底层的原子操作来保证在多线程环境下的安全性,而无需使用锁。

类型/接口描述示例代码
AtomicInteger一个使用int值的原子变量。AtomicInteger atomicInt = new AtomicInteger();
AtomicLong一个使用long值的原子变量。AtomicLong atomicLong = new AtomicLong();
AtomicReference一个使用任意引用类型的原子变量。AtomicReference<String> atomicRef = new AtomicReference<>();
AtomicBoolean一个使用boolean值的原子变量。AtomicBoolean atomicBool = new AtomicBoolean();

这些原子变量类提供了多种原子操作,如增加、减少、设置值、比较并设置等。

示例代码:

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AtomicExample {
    public static void main(String[] args) {
        // 创建一个原子整数值
        AtomicInteger counter = new AtomicInteger(0);

        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 提交增加操作给线程池
        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                for (int j = 0; j < 100; j++) {
                    counter.incrementAndGet(); // 原子增加操作
                }
            });
        }

        // 关闭线程池并等待所有任务完成
        executor.shutdown();
        while (!executor.isTerminated()) {
            // 等待线程池关闭
        }

        // 输出最终的计数值
        System.out.println("Final counter value: " + counter.get());
    }
}

在这个示例中,AtomicInteger被用来在多个线程中安全地累加数值。每个线程提交了一个增加操作给线程池,通过incrementAndGet()方法原子地增加计数器的值。

5.并发集合

并发集合是Java并发包中提供的线程安全的集合类,它们提供了比同步包装器(如Collections.synchronizedList)更高效的并发访问。

集合类型描述示例代码
ConcurrentHashMap线程安全的HashMap实现。ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
ConcurrentLinkedQueue高效的非阻塞线程安全队列。ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
ConcurrentSkipListMap跳表实现的线程安全有序映射。ConcurrentSkipListMap<String, String> skipMap = new ConcurrentSkipListMap<>();
CopyOnWriteArrayList线程安全的ArrayList,适用于读多写少的场景。CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
CopyOnWriteArraySet基于CopyOnWriteArrayList的线程安全Set实现。CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();

示例代码:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentCollectionExample {
    public static void main(String[] args) {
        // 创建线程安全的ConcurrentHashMap
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 提交任务给线程池,更新map中的值
        for (int i = 0; i < 3; i++) {
            final String key = "Key" + i;
            executor.submit(() -> {
                // 使用原子方法更新map中的值
                map.compute(key, (k, v) -> {
                    if (v == null) v = 0;
                    return v + 1; // 原子增加操作
                });
            });
        }

        // 关闭线程池并等待所有任务完成
        executor.shutdown();
        while (!executor.isTerminated()) {
            // 等待线程池关闭
        }

        // 输出map中的值
        map.forEach((k, v) -> System.out.println(k + ": " + v));
    }
}

在这个示例中,ConcurrentHashMap被用来在多个线程中安全地更新键值对。通过compute方法,可以原子地更新map中的值,而不需要额外的同步措施。

6.线程池

线程池是一种执行器(Executor),用于在一个后台线程中执行任务。线程池的主要目的是减少在创建和销毁线程时所产生的性能开销。Java提供了多种线程池的实现,允许开发者根据需要选择合适的线程池。

线程池类型描述示例代码
ThreadPoolExecutor提供了大量可配置的选项,是最常用的线程池实现。new ThreadPoolExecutor(...);
ScheduledThreadPoolExecutor用于延迟执行或定期执行任务的线程池。Executors.newScheduledThreadPool(3);
FixedThreadPool拥有固定数量线程的线程池。Executors.newFixedThreadPool(3);
SingleThreadExecutor只有一个线程的线程池,保证所有任务按顺序执行。Executors.newSingleThreadExecutor();
CachedThreadPool根据需要创建新线程的线程池,对于短生命周期的异步任务非常合适。Executors.newCachedThreadPool();

示例代码:

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小的线程池
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

        // 提交任务给线程池
        for (int i = 0; i < 5; i++) {
            int finalI = i;
            executor.submit(() -> {
                System.out.println("Task " + finalI + " is running on thread " + Thread.currentThread().getName());
            });
        }

        // 关闭线程池,不再接受新任务
        executor.shutdown();

        try {
            // 等待线程池关闭,最多等待1分钟
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                // 如果超时,则强制关闭
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            // 线程被中断时的处理
            executor.shutdownNow();
        }
    }
}

在这个示例中,我们创建了一个固定大小为3的ThreadPoolExecutor实例,并提交了5个任务。这些任务将由线程池中的线程执行。通过调用shutdown()方法,线程池将不再接受新任务,并且会等待已提交的任务执行完成。如果等待超过60秒,线程池将被强制关闭。

线程池更详细的内容可进一步查看我的另一篇博客:https://blog.csdn.net/u011174699/article/details/110296957?spm=1001.2014.3001.5502

7.Future和Callable接口

Future接口和Callable接口通常与线程池一起使用,用于异步执行任务并获取结果。

接口/类描述示例代码
Callable一个接口,允许在其call方法中抛出异常,并且可以返回结果。Callable<String> task = () -> { return "Done"; };
Future一个接口,代表异步计算的结果。它提供了检查计算是否完成的方法,以及等待计算结果的方法。Future<String> future = executor.submit(task);
FutureTask一个类,实现了Future接口,并且可以作为任务提交给线程池。FutureTask<String> futureTask = new FutureTask<>(task);

Future的主要方法包括:

  • boolean cancel(boolean mayInterruptIfRunning): 尝试取消执行。
  • boolean isCancelled(): 是否已经被取消。
  • boolean isDone(): 任务是否完成。
  • V get(): 获取任务的结果。
  • V get(long timeout, TimeUnit unit): 在指定的时间内获取任务的结果。

示例代码:

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        
        // 创建Callable任务
        Callable<String> callable = () -> {
            // 模拟任务执行时间
            Thread.sleep(1000);
            return "Callable task is done";
        };
        
        // 提交Callable任务到线程池,并返回Future对象
        Future<String> future = executorService.submit(callable);
        
        // 在主线程中执行其他任务,同时等待Callable任务完成
        System.out.println("Main thread is doing other work");
        
        // 获取Callable任务的结果
        String result = future.get();
        System.out.println(result);
        
        // 关闭线程池
        executorService.shutdown();
    }
}

在这个示例中,我们创建了一个Callable任务,并将其提交给线程池。通过Future对象,我们能够获取任务的结果。Future.get()方法会阻塞当前线程,直到任务完成并返回结果。

8.同步队列(BlockingQueue

BlockingQueue是一个线程安全的队列,用于在多个线程之间安全地传输数据。当生产者线程尝试添加一个元素到已满的队列时,它将被阻塞,直到队列中有足够的空间。同样,当消费者线程尝试从一个空队列中取一个元素时,它也将被阻塞,直到队列中有元素可用。

接口/类描述示例代码
BlockingQueue顶层接口,定义了阻塞队列的基本操作。BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
ArrayBlockingQueue由数组支持的有界阻塞队列。ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
LinkedBlockingQueue由链表支持的可选容量的阻塞队列。LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
PriorityBlockingQueue无界优先级阻塞队列。PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10);
SynchronousQueue不存储元素的阻塞队列,每个插入操作必须等待一个相应的移除操作。SynchronousQueue<Integer> queue = new SynchronousQueue<>();

BlockingQueue的主要方法包括:

  • void put(E e): 添加一个元素到队列,如果队列满了,则阻塞。
  • E take(): 移除并获取队列头部的元素,如果队列为空,则阻塞。
  • E poll(long timeout, TimeUnit unit): 如果可能,不阻塞地获取队列头部的元素,如果指定时间内队列为空,则返回null
  • E poll(): 如果可能,不阻塞地获取队列头部的元素,如果队列为空,则返回null

示例代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个有界阻塞队列
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);

        // 生产者线程
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    // 将元素添加到队列
                    queue.put(i);
                    System.out.println("Produced: " + i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 消费者线程
        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    // 从队列中取出元素
                    int number = queue.take();
                    System.out.println("Consumed: " + number);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 启动生产者和消费者线程
        producer.start();
        consumer.start();

        // 等待生产者和消费者线程结束
        producer.join();
        consumer.join();
    }
}

在这个示例中,我们创建了一个容量为5的LinkedBlockingQueue,并启动了一个生产者线程和一个消费者线程。生产者线程将数字添加到队列中,而消费者线程从队列中取出数字。由于队列是阻塞的,所以当生产者线程尝试添加元素到已满的队列时,它将被阻塞,直到队列中有空间。同样,消费者线程在尝试从一个空队列中取出元素时也会被阻塞。

9.条件对象(Condition)

在多线程编程中,条件对象允许线程等待一个条件成立,或者等待另一个线程通知。java.util.concurrent.locks包中的Condition接口提供了这样的功能,它与Lock接口一起使用,提供了比Object.wait()Object.notify()更灵活的线程间协调机制。

接口/类描述示例代码
Condition顶层接口,提供了线程等待和唤醒机制。Condition condition = lock.newCondition();
Lock顶层接口,提供了比synchronized更灵活的锁操作,并且可以与Condition一起使用。Lock lock = new ReentrantLock();
ReentrantLock实现了Lock接口的锁,可以与Condition一起使用。ReentrantLock lock = new ReentrantLock();

Condition的主要方法包括:

  • void await(): 导致当前线程等待,直到它被另一个线程唤醒。
  • void awaitUninterruptibly(): 与await()类似,但不响应中断。
  • long awaitNanos(long nanosTimeout): 在指定的时间内等待条件成立。
  • boolean await(long time, TimeUnit unit): 在指定的时间内等待条件成立,并返回条件是否在超时前成立。
  • void signal(): 唤醒一个等待在该条件上的线程。
  • void signalAll(): 唤醒所有等待在该条件上的线程。

示例代码:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
    public static void main(String[] args) {
        final Lock lock = new ReentrantLock();
        final Condition condition = lock.newCondition();

        // 线程A
        new Thread(() -> {
            lock.lock();
            try {
                // 等待条件成立
                condition.await();
                System.out.println("Thread A: Condition was signaled.");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                lock.unlock();
            }
        }).start();

        // 线程B
        new Thread(() -> {
            lock.lock();
            try {
                // 模拟条件成立前的准备工作
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                // 通知等待的线程条件已经成立
                condition.signal();
            } finally {
                lock.unlock();
            }
        }).start();
    }
}

在这个示例中,线程A等待一个条件成立,而线程B在模拟了一些工作之后,通过调用condition.signal()来通知线程A条件已经成立。线程A随后被唤醒并继续执行。

10.Fork/Join框架

Fork/Join框架是一种用于并行执行任务的框架,特别适合于可以被分解为小任务的递归算法,如排序、搜索和数学计算。它通过将任务分割成更小的任务并行执行,然后在适当的时候合并结果,从而提高性能。

类型/类描述示例代码
ForkJoinTask表示一个可以并行执行的任务。ForkJoinTask<Void> task = new RecursiveTask<Void>() { ... };
RecursiveTask继承自ForkJoinTask的抽象类,用于需要执行递归操作的任务。class MyRecursiveTask extends RecursiveTask<Integer> { ... }
ForkJoinPool用于执行ForkJoinTask的线程池。ForkJoinPool pool = new ForkJoinPool();

ForkJoinPool的主要方法包括:

  • invoke(task): 启动一个任务并等待其完成。
  • submit(task): 提交一个任务并返回一个Future对象。
  • execute(task): 执行一个任务,但不等待其完成。

示例代码:

import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class ForkJoinExample {
    static class SumTask extends RecursiveTask<Long> {
        private final int[] array;
        private final int start;
        private final int end;
        private static final int THRESHOLD = 20;

        public SumTask(int[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            long sum = 0;
            if (end - start < THRESHOLD) {
                // 如果数组的这部分足够小,直接计算
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
            } else {
                // 否则,将任务分成两部分
                int middle = (start + end) / 2;
                SumTask subtask1 = new SumTask(array, start, middle);
                SumTask subtask2 = new SumTask(array, middle, end);
                subtask1.fork();  // 异步执行第一个子任务
                subtask2.fork();  // 异步执行第二个子任务
                sum = subtask1.join() + subtask2.join(); // 等待两个子任务完成并合并结果
            }
            return sum;
        }
    }

    public static void main(String[] args) {
        int[] array = { ... }; // 假设这是我们的数组
        ForkJoinPool pool = new ForkJoinPool();
        SumTask task = new SumTask(array, 0, array.length);
        long result = pool.invoke(task); // 启动任务并等待结果
        System.out.println("Sum: " + result);
    }
}

在这个示例中,我们创建了一个SumTask类,它继承自RecursiveTask,用于并行计算数组的和。当任务足够小的时候,我们直接计算它的和;否则,我们将任务分割成两个子任务并行执行它们,然后合并结果。

11.CompletableFuture异步编程

CompletableFuture是一个在Java 8中引入的类,它提供了一种异步编程的模型,允许开发者以声明式的方式编写异步代码。它封装了一个异步操作,并允许开发者添加回调函数来处理操作的结果。

类型/类描述示例代码
CompletableFuture表示一个可能还没有完成的异步操作。CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Done"; });
CompletionStage一个接口,表示一个异步操作的阶段,可以有返回值或没有。CompletionStage<String> stage = future.thenApply(s -> s + " Stage");

CompletableFuture的主要方法包括:

  • supplyAsync(Supplier): 提交一个异步的任务。
  • thenApply(Function): 当前操作完成后,应用一个函数。
  • thenAccept(Consumer): 当前操作完成后,执行一个动作。
  • thenRun(Runnable): 当前操作完成后,运行一个动作。
  • thenCombine(CompletionStage, BiFunction): 将两个CompletionStage的结果合并。
  • thenAcceptBoth(CompletionStage, BiConsumer): 将当前操作的结果与其他操作的结果结合。
  • allOf(CompletableFuture...): 等待多个CompletableFuture完成。
  • anyOf(CompletableFuture...): 等待多个CompletableFuture中的任何一个完成。

示例代码:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 创建一个CompletableFuture任务
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            // 模拟长时间运行的任务
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Task is done";
        });

        // 为CompletableFuture添加回调
        future.thenAccept(result -> {
            System.out.println(result);
        });

        // 等待任务完成(阻塞调用)
        try {
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

在这个示例中,我们创建了一个异步执行的CompletableFuture任务,并为其添加了一个回调,当任务完成时,回调会被触发并打印结果。然后,我们使用get()方法等待任务完成,这个方法是阻塞的。

这个在项目中是很常用的,尤其用于提升接口响应速度特别好用,特别是那种需要多个查询并汇总数据后返回的接口。

12.Phaser多线程协调

Phaser是一个同步辅助类,用于协调多个线程在某些阶段的执行。它允许线程在不同的“阶段”(phase)中等待,直到到达某个安全点(barrier)才开始执行下一个阶段的操作。

类型/类描述示例代码
Phaser用于多线程协调的类,允许线程在不同阶段同步。Phaser phaser = new Phaser();

Phaseer的主要方法包括:

  • int getPhase(): 返回当前阶段的编号。
  • boolean arrive(): 当前线程到达,增加到达数。
  • boolean arriveAndAwaitAdvance(): 当前线程到达并且等待直到进入下一个阶段。
  • boolean awaitAdvance(int phase): 等待直到进入指定的阶段。
  • int getRegisteredParties(): 返回注册的线程数量。

示例代码:

import java.util.concurrent.Phaser;

public class PhaserExample {
    public static void main(String[] args) {
        final Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("Phase " + phase + " is completed with " + registeredParties + " registered parties.");
                return true; // 继续到下一个阶段
            }
        };

        for (int i = 0; i < 3; i++) {
            final int taskNumber = i;
            new Thread(() -> {
                System.out.println("Task " + taskNumber + " is running.");
                phaser.arriveAndAwaitAdvance(); // 到达并等待下一个阶段
                System.out.println("Task " + taskNumber + " is resuming after phase " + phaser.getPhase());
            }).start();
        }
    }
}

在这个示例中,我们创建了一个Phaser实例,并重写了onAdvance方法来输出每个阶段完成的信息。每个任务线程通过调用arriveAndAwaitAdvance()来注册自己并等待其他任务到达,直到所有任务都到达后,它们将一起进入下一个阶段。

13.StampedLock读写锁

StampedLock是Java 8引入的一种新的锁机制,用于提供一种更高效的读写锁。与ReadWriteLock相比,StampedLock提供了更细粒度的控制,并且可以避免写饥饿的问题。

类型/类描述示例代码
StampedLock一种新的读写锁,提供了乐观读锁和悲观写锁。StampedLock stampedLock = new StampedLock();

StampededLock的主要方法包括:

  • long tryOptimisticRead(): 尝试乐观读锁,适用于读操作远多于写操作的场景。
  • long readLock(): 获取读锁。
  • long tryReadLock(): 尝试获取读锁。
  • void unlock(long stamp): 释放锁,stamp是获取锁时返回的标记。
  • long writeLock(): 获取写锁。
  • long tryWriteLock(): 尝试获取写锁。

示例代码

import java.util.concurrent.locks.StampedLock;

public class StampedLockExample {
    private static final StampedLock stampedLock = new StampedLock();
    private static final int[] data = new int[100];

    public static void main(String[] args) {
        // 启动读操作线程
        Thread reader = new Thread(() -> {
            long stamp = stampedLock.tryOptimisticRead();
            // 执行读操作
            int sum = 0;
            for (int value : data) {
                sum += value;
            }
            // 验证读操作期间是否发生了写
            if (stampedLock.validate(stamp)) {
                System.out.println("Sum (optimistic read): " + sum);
            } else {
                // 如果发生了写,重新获取读锁
                stamp = stampedLock.readLock();
                try {
                    int sum2 = 0;
                    for (int value : data) {
                        sum2 += value;
                    }
                    System.out.println("Sum (re-read): " + sum2);
                } finally {
                    stampedLock.unlock(stamp);
                }
            }
        });

        // 启动写操作线程
        Thread writer = new Thread(() -> {
            long stamp = stampedLock.writeLock();
            try {
                // 执行写操作
                for (int i = 0; i < data.length; i++) {
                    data[i] = i;
                }
                System.out.println("Data initialized.");
            } finally {
                stampedLock.unlock(stamp);
            }
        });

        reader.start();
        writer.start();
    }
}

在这个示例中,我们创建了一个StampedLock实例,并使用它来同步对共享数据数组data的读写操作。读线程尝试使用乐观读锁来计算数组的和,如果在计算过程中发生了写操作,它会重新获取读锁并再次计算。写线程则直接获取写锁来初始化数组。

14.LongAdder和DoubleAdder原子累加器

LongAdderDoubleAdder是Java 8中引入的用于进行原子累加操作的类,它们适用于对longdouble数值进行频繁的累加操作,尤其是在高并发场景下。与AtomicLong相比,LongAdder在大量更新时通常提供更好的性能,因为它使用一种细粒度的算法来减少冲突。

类型/类描述示例代码
LongAdder一个用于long数值的原子累加器。LongAdder adder = new LongAdder();
DoubleAdder一个用于double数值的原子累加器。DoubleAdder adder = new DoubleAdder();

LongAdderDoubleAdder的主要方法包括:

  • void add(long): 添加值到累加器。
  • void add(double)DoubleAdder中用于添加值。
  • long sum(): 返回累加器当前的值。
  • long sumThenReset(): 返回累加器当前的值并重置累加器为0。

示例代码:

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.LongAdder;

public class AdderExample {
    private static final LongAdder adder = new LongAdder();

    public static void main(String[] args) {
        // 创建多个线程对共享的LongAdder进行累加操作
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    adder.add(ThreadLocalRandom.current().nextLong());
                }
            }).start();
        }

        // 等待所有线程完成累加
        while (Thread.activeCount() > 1) {
            Thread.yield();
        }

        // 输出累加结果
        System.out.println("Sum: " + adder.sum());
    }
}

在这个示例中,我们创建了一个LongAdder实例,并启动了多个线程对它进行累加操作。每个线程生成随机的长整型数字并累加到LongAdder中。所有线程完成后,我们输出累加的结果

15.阻塞队列
类型/类描述示例代码
ArrayBlockingQueue由数组支持的有界阻塞队列。ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
LinkedBlockingQueue由链表支持的可选容量的阻塞队列。LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
PriorityBlockingQueue无界优先级阻塞队列,按照自然顺序排序元素。PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(10);
DelayQueue无界阻塞队列,使用优先级队列,元素只有在达到指定的延迟时间后才可用。DelayQueue<Delayed> queue = new DelayQueue<>();
SynchronousQueue不存储元素的阻塞队列,每个插入操作必须等待一个相应的移除操作。SynchronousQueue<Integer> queue = new SynchronousQueue<>();

通常与线程池结合使用,是自定义线程池时的其中的一个参数,与内置的几种自动创建线程池的对应关系

线程池        阻塞队列
FixedThreadPoolLinkedBlockingQueue
SingleThreadExecutorLinked BlockingQueue
CachedThreadPoolSynchronousQueue
ScheduledThreadPoolDelayedWorkQueue
SingleThreadScheduled ExecutorDelayedWorkQueue

 示例代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个有界阻塞队列
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // 创建一个固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 生产者线程
        executor.submit(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    // 将元素添加到队列
                    queue.put(i);
                    System.out.println("Produced: " + i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 消费者线程
        executor.submit(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    // 从队列中取出元素
                    int number = queue.take();
                    System.out.println("Consumed: " + number);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 关闭线程池
        executor.shutdown();
    }
}

在这个示例中,我们创建了一个ArrayBlockingQueue实例,它是一个有界阻塞队列,然后创建了一个线程池。生产者线程向队列中添加数字,而消费者线程从队列中取出数字。由于队列是阻塞的,所以当生产者尝试添加元素到已满的队列时,它将被阻塞,直到队列中有空间。同样,消费者在尝试从一个空队列中取出元素时也会被阻塞。

16.线程局部变量

线程局部变量(ThreadLocal)是一种特殊的变量,每个线程都独立拥有一份副本,线程之间的副本互不影响。ThreadLocal类提供了线程局部变量的实现,非常适合用于存储那些需要线程隔离的信息,比如事务ID、用户会话等。

类型/类描述示例代码
ThreadLocal线程局部变量,为每个线程提供独立的变量副本。ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

ThreadLocal的主要方法包括:

  • void set(T value): 为当前线程设置线程局部变量的值。
  • T get(): 获取当前线程的线程局部变量副本的值。
  • remove(): 移除当前线程的线程局部变量副本。

示例代码:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadLocalExample {
    private static final ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // 提交任务到线程池
        executorService.submit(() -> {
            // 为当前线程设置ThreadLocal的值
            threadLocal.set(1);
            System.out.println("ThreadLocal value: " + threadLocal.get());
        });

        executorService.submit(() -> {
            // 尝试获取ThreadLocal的值,预期为null,因为每个线程有自己的副本
            System.out.println("ThreadLocal value: " + threadLocal.get());
        });

        // 关闭线程池
        executorService.shutdown();
    }
}

在这个示例中,我们创建了一个ThreadLocal实例,并在两个不同的线程中对它进行操作。第一个线程通过调用set()方法为ThreadLocal设置了一个值,然后通过调用get()方法获取并打印了这个值。第二个线程尝试获取ThreadLocal的值,但由于每个线程拥有自己的副本,它将得到null

17.CompletionService

CompletionService是一个接口,它将任务的异步执行与线程池结合在一起,并提供了一种机制,允许你顺序地获取任务的结果。CompletionService通常与Executor(如ThreadPoolExecutorScheduledThreadPoolExecutor)一起使用,可以按任务提交的顺序检索任务的结果。

方法描述
Future take()检索下一个已完成的任务,如果没有任务完成则阻塞。
Future poll(long timeout, TimeUnit unit)在给定的时间内等待并检索下一个已完成的任务,如果超时则返回null
Future poll()不阻塞地检索下一个已完成的任务,如果没有任务完成则返回null

示例代码:

import java.util.concurrent.*;

public class CompletionServiceExample {
    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        // 创建CompletionService
        CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);

        // 提交任务到CompletionService
        completionService.submit(() -> "Task 1 completed");
        completionService.submit(() -> "Task 2 completed");
        completionService.submit(() -> "Task 3 completed");

        // 按顺序获取任务结果
        try {
            Future<String> future1 = completionService.take(); // 阻塞直到下一个任务完成
            System.out.println(future1.get());

            Future<String> future2 = completionService.poll(2, TimeUnit.SECONDS); // 2秒超时
            if (future2 != null) {
                System.out.println(future2.get());
            }

            Future<String> future3 = completionService.poll();
            if (future3 != null) {
                System.out.println(future3.get());
            }
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            executorService.shutdown();
        }
    }
}

在这个示例中,我们创建了一个ExecutorService和基于它的CompletionService。然后,我们提交了三个任务到CompletionService。使用take()方法,我们可以阻塞地等待并获取第一个完成的任务的结果。使用poll()方法,我们可以非阻塞地获取任务结果,或者在指定时间内等待。

请注意,在实际使用中,通常不会像示例中那样直接提交已经完成的任务,而是提交那些需要由ExecutorService异步执行的任务。这里的示例只是为了展示CompletionService的使用方法。

具体的实践案例可以参考我的另一篇博客:

使用CompletionService进行多个文件打包为zip下载_hutool zip 打包-CSDN博客

    暂且总结这些,后续应该还会再补充一些细节的内容。


原文地址:https://blog.csdn.net/u011174699/article/details/138256471

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!