java8parallelstream

coding

Parallel stream

Fork and join

Pseudocode

If(task is small enough or no longer divisible){

Compute task sequentially

}else{

split task in two subtasks

call this method recursively possibly further splitting  each subtask

wait for the completion of all subtasks

combine the results of each subtask}

 

 

 

 

public class ForkJoinSumCalculator extends RecursiveTask<Long> {

   
private final long[] numbers;
   
private final int start;
   
private final int end;
   
public static final long THRESHOLD = 10_1000;


   
public ForkJoinSumCalculator(long[] numbers) {
       
this(numbers, 0, numbers.length);

    }

   
public ForkJoinSumCalculator(long[] numbers, int start, int end) {
       
this.numbers = numbers;
       
this.start = start;
       
this.end = end;

    }


   
@Override
   
protected Long compute() {

       
int length = end - start;
       
if (length <= THRESHOLD) {
           
return computeSequentially();
        }
        ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length / 2);
        leftTask.fork();
        ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
       
return leftResult + rightResult;
    }

   
private long computeSequentially() {
       
long sum = 0;
       
for (int i = start; i < end; i++) {
            sum +=
numbers[i];
        }
       
return sum;
    }

   
public static long forkJoinSum(long n){
       
long[] numbers= LongStream.rangeClosed(1,n).toArray();
        ForkJoinTask<Long> task=
new ForkJoinSumCalculator(numbers);
       
return new ForkJoinPool().invoke(task);
    }
}

@Test

public void test(){

    long i = 10000000;

    StopWatch stopWatch=new StopWatch();

    stopWatch.start();

    Function<Long,Long> adder=ForkJoinSumCalculator::forkJoinSum ;

    adder.apply(i);

    stopWatch.stop();

    System.out.println(stopWatch.getTotalTimeSeconds());

}

 

 

以上是 java8parallelstream 的全部内容, 来源链接: utcz.com/z/509710.html

回到顶部