自学内容网 自学内容网

JAVA 多线程之ForkJoin

Fork/Join 框架是jdk7提供的一个用于并行执行任务的框架,是一个把大任务分成若干小任务,最终汇总每个小任务结果,最终得到大任务计算结果的框架。

        Fork:就是把一个大任务切分成若干小任务,并行执行;
        Join:就是合并这些小任务的执行结果,最终得到大任务的结果。

Fork/Join 根据工作窃取算法进行设计

工作窃取算法:
(work-stealing),是指某个线程从其他队列里窃取任务来执行。

为什么需要工作窃取算法?
  假如有些线程一起工作,可能有些线程的工作早早结束,结束的线程与其等着,不如去帮其他线程干活,于是就去其他线程的队列里,窃取一个任务来执行。
  而在这时,他们会访问同一队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列维护;
  被窃取任务的线程永远从双端队列头部拿取任务执行;
  窃取任务线程永远从双端队列尾部拿取任务执行;

Fork/Join 框架设计:
        Fork:分割任务。首先,我们需要一个fork类,来把大任务拆分成子任务,有可能子任务还很大,需要不停的分割,直到分割出子任务足够小。
        抽象类ForkJoinTask提供了两个子抽象类
                RecursiveAction:没有返回结果的任务
                RecursiveTask:有返回结果的任务

        Join:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取线程并执行。子任务执行完的结果同一放到一个队列里,启动一个线程从队列里拿数据
 然后合并这些数据

ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行

使用场景:
        可以采用分而治之的算法场景
        计算密集型的任务

示例:
计算 1+2+3+……+100,如果加数之间差值大于等于10,则拆分为子任务。

package org.test.mem.thread.pool.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * Fork/Join 框架是jdk7提供的一个用于并行执行任务的框架,是一个把大任务分成若干小任务,最终汇总每个小任务结果,最终得到大任务计算结果的框架。
 * <p>
 * Fork:就是把一个大任务切分成若干小任务,并行执行;
 * Join:就是合并这些小任务的执行结果,最终得到大任务的结果。
 * <p>
 * Fork/Join 根据工作窃取算法进行设计
 * <p>
 * 工作窃取算法:(work-stealing),是指某个线程从其他队列里窃取任务来执行。
 *
 * 为什么需要工作窃取算法?
 * 假如有些线程一起工作,可能有些线程的工作早早结束,结束的线程与其等着,不如去帮其他线程干活,于是就去其他线程的队列里,窃取一个任务来执行。
 * 而在这时,他们会访问同一队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列维护;
 * 被窃取任务的线程永远从双端队列头部拿取任务执行;
 * 窃取任务线程永远从双端队列尾部拿取任务执行;
 *
 * Fork/Join 框架设计:
 * Fork:分割任务。首先,我们需要一个fork类,来把大任务拆分成子任务,有可能子任务还很大,需要不停的分割,直到分割出子任务足够小。
 *   抽象类ForkJoinTask提供了两个子抽象类
 *      RecursiveAction:没有返回结果的任务
 *      RecursiveTask:有返回结果的任务
 *
 * Join:执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取线程并执行。子任务执行完的结果同一放到一个队列里,启动一个线程从队列里拿数据
 * 然后合并这些数据
 *
 *  ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行
 *
 *
 * 使用场景:
 * 可以采用分而治之的算法场景
 * 计算密集型的任务
 *
 * 实例:
 * 计算 1+2+3+……+100,如果加数之间差值大于等于10,则拆分为子任务。
 *
 * */
public class ForkJoinTest extends RecursiveTask<Integer> {

    private static final int THRESHOLD = 10;
    private int start;
    private int end;

    public ForkJoinTest(int start, int end) {
        this.start = start;
        this.end = end;
    }


    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //计算任务
        ForkJoinTest task = new ForkJoinTest(1, 100);
        //异步执行任务
        ForkJoinTask<Integer> result = forkJoinPool.submit(task);

        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }


    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end - start < THRESHOLD) {
            for (int i = start; i <= end; i++) {
                sum += i;
//                System.out.println(Thread.currentThread().getName() + ":" + i + " sum:" + sum);
            }
            System.out.println(Thread.currentThread().getName() +" sum:" +sum +" start:"+start+" end:"+end);
            return sum;
        }else{
            int middle = (end + start) / 2;
            ForkJoinTest leftForkJoin = new ForkJoinTest(start, middle);
            ForkJoinTest rightForkJoin = new ForkJoinTest(middle + 1, end);
            leftForkJoin.fork();
            rightForkJoin.fork();
            int leftResult = leftForkJoin.join();
            int rightResult = rightForkJoin.join();
            sum = leftResult + rightResult;
            return sum;
        }
    }
}


原文地址:https://blog.csdn.net/qingwufeiyang_530/article/details/143601478

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!