第十二章 并行Stream流
目录
一、引言
串行的Stream流:前面章节我们使用的Stream流是串行的,就是在一个线程上执行。运行上图中的串行流,我们查看下图中的运行结果,可以看到运行的都是同一个主线程:那么JDK 8中的并行的Stream流,即parallelStream,其实就是一个并行执行的流。它通过默认的ForkJoinPool,可能提高多线程任务的速度。
二、获取并行Stream流的两种方式
1. 直接获取并行的流
2. 将串行流转成并行流 并行操作代码:
运行结果:
三、并行和串行Stream流的效率对比
使用for循环,串行Stream流,并行Stream流来对5亿个数字求和。看消耗的时间。
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.stream.LongStream;
public class StreamTest {
private static long times = 500000000L;
private long start;
@Before
public void init() {
start = System.currentTimeMillis();
}
@After
public void destroy() {
long end = System.currentTimeMillis();
System.out.println("消耗时间:" + (end - start));
}
// 测试效率,parallelStream 120
@Test
public void parallelStream() {
System.out.println("serialStream");
LongStream.rangeClosed(0, times)
.parallel()
.reduce(0, Long::sum);
}
// 测试效率,普通Stream 342
@Test
public void serialStream() {
System.out.println("serialStream");
LongStream.rangeClosed(0, times)
.reduce(0, Long::sum);
}
// 测试效率,正常for循环 421
@Test
public void forAdd() {
System.out.println("forAdd");
long result = 0L;
for (long i = 1L; i < times; i++) {
result += i;
}
}
}
我们可以看到parallelStream的效率是最高的,Stream并行处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作。
四、parallelStream线程安全问题
并行流为我们带来高性能的同时,也带来了线程安全性问题:
运行效果:
我们可以看到,往集合中添加1000个元素,而实际上只有917个元素,这就是并发处理时线程安全问题导致的。
解决方法: 加锁、使用线程安全的集合或者调用Stream的 toArray() / collect() 操作就是满足线程安全的了。
package com.wzx;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Test {
public static void main(String[] args) {
ArrayList<Integer> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
// 解决parallelStream线程安全问题方案一:使用同步代码块
// Object obj = new Object();
// List<Integer> newList = new ArrayList<>();
// 使用并行的流往集合中添加数据
// list.parallelStream()
// .forEach(s -> {
// synchronized (obj) {
// newList.add(s);
// }
// });
// 解决parallelStream线程安全问题方案二:使用线程安全的集合
// Vector<Integer> newList = new Vector<>();
// 1. 使用并行的流往集合中添加数据
// list.parallelStream()
// .forEach(s -> {
// newList.add(s);
// });
// 2. 使用Collections.synchronizedList()
// List<Integer> newList = new ArrayList<>();
// List<Integer> synchronizedList = Collections.synchronizedList(newList);
// 使用并行的流往集合中添加数据
// list.parallelStream()
// .forEach(s -> {
// synchronizedList.add(s);
// });
// 解决parallelStream线程安全问题方案三:调用Stream流的collect/toArray
List<Integer> newList = IntStream.rangeClosed(1, 1000).parallel().boxed().collect(Collectors.toList());
System.out.println("newList = " + newList.size());
}
}
五、parallelStream背后的技术
5.1. Fork/Join框架介绍
parallelStream使用的是Fork/Join框架。Fork/Join框架自JDK 7引入。Fork/Join框架可以将一个大任务拆分为很多小任务来异步执行。 Fork/Join框架主要包含三个模块:
1. 线程池:ForkJoinPool
2. 任务对象:ForkJoinTask
3. 执行任务的线程:ForkJoinWorkerThread
5.2. Fork/Join原理-分治法
ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
5.3. Fork/Join原理-工作窃取算法
Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念,Fork/Join工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
上文中已经提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,也就是我们使用了ForkJoinPool的ParallelStream。
对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。可以通过设置系统属性:java.util.concurrent.ForkJoinPool.common.parallelism=N (N为线程数量),来调整ForkJoinPool的线程数量,可以尝试调整成不同的参数来观察每次的输出结果。
5.4. Fork/Join案例
需求:使用Fork/Join计算1-10000的和,当一个任务的计算数量大于3000时拆分任务,数量小于3000时计算。
package com.wzx;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class Test {
public static void main(String[] args) {
long start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
SumRecursiveTask task = new SumRecursiveTask(1, 10000L);
Long result = pool.invoke(task);
System.out.println("result = " + result);
long end = System.currentTimeMillis();
System.out.println("消耗的时间为: " + (end - start));
}
}
class SumRecursiveTask extends RecursiveTask<Long> {
private static final long THRESHOLD = 3000L;
private final long start;
private final long end;
public SumRecursiveTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
// 任务不用再拆分了.可以计算了
if (length <= THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
System.out.println("计算: " + start + " -> " + end + ",结果为: " + sum);
return sum;
// 数量大于预定的数量,任务还需要再拆分
} else {
long middle = (start + end) / 2;
System.out.println("拆分: 左边 " + start + " -> " + middle + ", 右边 " + (middle +
1) + " -> " + end);
SumRecursiveTask left = new SumRecursiveTask(start, middle);
left.fork();
SumRecursiveTask right = new SumRecursiveTask(middle + 1, end);
right.fork();
return left.join() + right.join();
}
}
}
原文地址:https://blog.csdn.net/qushaming/article/details/143893951
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!