在使用Java8并行流时的问题分析

java

最近在使用Java8的并行流时遇到了坑,线上排查问题时花了较多时间,分享出来与大家一起学习与自查

// 此处为坑

List<Java8Demo> copy = Lists.newArrayList();

numbers.parallelStream().forEach(item -> {

copy.add(new Java8Demo(item));

});

上图用到了parallelStrem并行流,在循环内部往共享变量copy内写值,由于ArrayList本身不具备线程安全性,导致得到的copy内容有缺失。

总结经验如下:

  1. 在并行流内部不能对外部共享变量做写操作;

  2. 如有需要,使用收集器实现上述并行流,收集器在内部即使使用ArrayList,也不会造成问题!

提供两种解决方案:

  • 串行

    // stream串行

    List<Java8Demo> copy = Lists.newArrayList();

    numbers.stream().forEach(item -> {

    copy.add(new Java8Demo(item));

    });

  • 收集器

    // 并行使用收集器

    List<Java8Demo> copy = numbers.parallelStream().map(Java8Demo::new).collect(Collectors.toList());

可运行Demo.java

package acc.biz.impl;

import java.util.Arrays;

import java.util.List;

import java.util.stream.Collectors;

import com.google.common.collect.Lists;

public class Demo {

private Integer value;

public Demo(Integer value) {

this.value = value;

}

public static List<Integer> numbers = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

public static void main(String[] args) {

/** parallelStream并行 */

int count1 = 1;

while (count1 < 100) {

// 此处为坑

List<Demo> copy = Lists.newArrayList();

numbers.parallelStream().forEach(item -> {

copy.add(new Demo(item));

});

// 打印错误

if (copy.size() != numbers.size()) {

System.out.println(

new StringBuilder().append("parallelStream循环第").append(count1).append("次报错,numbers.size: [")

.append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]"));

break;

}

count1++;

}

/** stream串行 */

int count2 = 1;

while (count2 < 100) {

// stream串行

List<Demo> copy = Lists.newArrayList();

numbers.stream().forEach(item -> {

copy.add(new Demo(item));

});

// 打印错误

if (copy.size() != numbers.size()) {

System.out.println(new StringBuilder().append("stream循环第").append(count2).append("次报错,numbers.size: [")

.append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]"));

break;

}

count2++;

}

/** Collectors并行 */

int count3 = 1;

while (count3 < 100) {

// 并行使用收集器

List<Demo> copy = numbers.parallelStream().map(Demo::new).collect(Collectors.toList());

// 打印错误

if (copy.size() != numbers.size()) {

System.out.println(

new StringBuilder().append("Collectors循环第").append(count3).append("次报错,numbers.size: [")

.append(numbers.size()).append("],copy.size: [").append(copy.size()).append("]"));

break;

}

count3++;

}

}

public Integer getValue() {

return value;

}

public void setValue(Integer value) {

this.value = value;

}

}

 分享一个知识点:

Java的一个新功能静态方法指定泛型类型。

Java8 的静态泛型方法是: YourClass.<T>yourMethod();

C# 的静态泛型方法是: YourClass<T>.yourMethod();

如下的例子:

package demo;

import j.m.XList;

import j.m.XMap;

public class Main {

public static void main(String[] args) {

//public static final <U> XList<U> fromJSON(String json); 这是j.jar中的方法原型,

double s = XList.<XMap<String, Object>>fromJSON("[{id:1,name:'aaa',money:1000.00},{id:2,name:'bbb',money:10000.00},{id:3,name:'ccc',money:30000.00}]")

.parallelStream()

.map(x -> x.getDouble("money"))//不指定的话这里的x类型是Object

.reduce(0.0, Double::sum);//用到了并行流,不可用累加到外部变量的方式求和

System.out.println(s);//41000

}

}

以上是 在使用Java8并行流时的问题分析 的全部内容, 来源链接: utcz.com/z/392511.html

回到顶部