【Java】Java中J.U.C扩展组件之ForkJoinTask和ForkJoinPool

  • ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割的子任务会添加到当前工作维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其它工作线程的队列尾部获取一个任务。

  • ForkJoinTask:我们需要使用ForkJoin框架,首先要创建一个ForkJoin任务。它提供在任务中执行Fork()Join()操作的机制,通常情况下不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供以下两个子类。

  • RecursiveAction:用于没有返回值的任务。

  • RecursizeTask:用于有返回值的任务。

【Java】Java中J.U.C扩展组件之ForkJoinTask和ForkJoinPool

Exception

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

/** ForkJoinTask运行状态 */

volatile int status; // 直接被ForkJoin池和工作线程访问

static final int DONE_MASK = 0xf0000000; // mask out non-completion bits

static final int NORMAL = 0xf0000000; // must be negative

static final int CANCELLED = 0xc0000000; // must be < NORMAL

static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED

static final int SIGNAL = 0x00010000; // must be >= 1 << 16

static final int SMASK = 0x0000ffff; // short bits for tags

/**

* @Ruturn 任务是否扔出异常或被取消

*/

public final boolean isCompletedAbnormally() {

return status < NORMAL;

}

/**

* 如果计算扔出异常,则返回异常

* 如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null

*/

public final Throwable getException() {

int s = status & DONE_MASK;

return ((s >= NORMAL) ? null :

(s == CANCELLED) ? new CancellationException() :

getThrowableException());

}

}

ForkJoinPool源码

public class ForkJoinPool extends AbstractExecutorService {

/**

* ForkJoinPool,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了

* 一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希

* 望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。

*/

public ForkJoinPool() {

this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),

defaultForkJoinWorkerThreadFactory, null, false);

}

public ForkJoinPool(int parallelism) {

this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);

}

//有多个构造器,这里省略

volatile WorkQueue[] workQueues; // main registry

static final class WorkQueue {

final ForkJoinWorkerThread owner; // 工作线程

ForkJoinTask<?>[] array; // 任务

//传入的是ForkJoinPool与指定的一个工作线程

WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {

this.pool = pool;

this.owner = owner;

// Place indices in the center of array (that is not yet allocated)

base = top = INITIAL_QUEUE_CAPACITY >>> 1;

}

}

}

FrokJoinPool work stealing算法

【Java】Java中J.U.C扩展组件之ForkJoinTask和ForkJoinPool

public class ForkJoinPool extends AbstractExecutorService {

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {}

public <T> ForkJoinTask<T> submit(Callable<T> task) {}

public <T> ForkJoinTask<T> submit(Runnable task, T result) {}

public ForkJoinTask<?> submit(Runnable task) {}

}

ForkJoinTask

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

/**

* 在当前任务正在运行的池中异步执行此任务(如果适用)

* 或使用ForkJoinPool.commonPool()(如果不是ForkJoinWorkerThread实例)进行异步执行

*/

public final ForkJoinTask<V> fork() {

Thread t;

if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)

((ForkJoinWorkerThread)t).workQueue.push(this);

else

ForkJoinPool.common.externalPush(this);

return this;

}

public final V join() {

int s;

if ((s = doJoin() & DONE_MASK) != NORMAL)

reportException(s);

return getRawResult();

}

private int doJoin() {

int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;

return (s = status) < 0 ? s :

((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?

(w = (wt = (ForkJoinWorkerThread)t).workQueue).

tryUnpush(this) && (s = doExec()) < 0 ? s :

wt.pool.awaitJoin(w, this, 0L) :

externalAwaitDone();

}

}

  • 检查调用join()的线程是否是ForkJoinThread线程。如果不是(例如main线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
  • 检查任务的完成状态,如果已经完成,则直接返回结果。
  • 如果任务尚未完成,但是处理自己的工作队列,则完成它。
  • 如果任务已经被其它线程偷走,则这个小偷工作队列的任务以先进先出的方式执行,帮助小偷线程尽快完成join

  • 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要join的任务时,则找到小偷的小偷(递归执行),帮助它完成它的任务。

【Java】Java中J.U.C扩展组件之ForkJoinTask和ForkJoinPool

ForkJoinPool.submit方法

public static void main(String[] args) throws ExecutionException, InterruptedException {

//生成一个池

ForkJoinPool forkJoinPool=new ForkJoinPool();

ForkJoinTask task=new ForkJoinExample(1, 100000);

ForkJoinTask<Integer> submit = forkJoinPool.submit(task);

Integer sum = submit.get();

System.out.println("最后的结果是:"+sum);

}

关注微信公众号:【入门小站】,解锁更多知识点。

【Java】Java中J.U.C扩展组件之ForkJoinTask和ForkJoinPool

以上是 【Java】Java中J.U.C扩展组件之ForkJoinTask和ForkJoinPool 的全部内容, 来源链接: utcz.com/a/104598.html

回到顶部