有没有一种方法可以强制parallelStream()进行并行处理?

如果输入大小太小,则库会自动序列化 流中地图的执行,但是这种自动化不会并且也不会考虑地图操作的繁重程度。有没有办法

强制parallelStream()实际并行化CPU重映射?

回答:

似乎存在根本的误解。链接的“问答”讨论了由于OP没有 看到预期的加速,流显然不能并行工作。结论是,有没有好处在

并行处理工作负载是否太小,不,有一个自动回退到顺序执行。

实际上是相反的。如果您请求并行处理,即使实际上降低了性能,您也会得到并行处理。在这种情况下,实现不会切换到可能更有效的顺序执行。

因此,如果您确信每个元素的工作量都足够高,足以证明使用并行执行是合理的,而不管元素的数量如何,那么您可以简单地请求并行执行。

可以很容易地证明:

Stream.of(1, 2).parallel()

.peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))

.forEach(System.out::println);

On Ideone, it prints

processing 2 in Thread[main,5,main]

2

processing 1 in Thread[ForkJoinPool.commonPool-worker-1,5,main]

1

但是消息的顺序和详细信息可能会有所不同。甚至有可能在某些环境中,两个任务可能恰好由同一线程执行,

如果它可以在另一个线程开始将其提起之前将第二个任务进行加固。但是,当然,如果任务足够昂贵,就不会发生这种情况。在重要的一点是,总体工作量已经分裂,排队要由其他工作线程可能回升。

如果对于

上面的简单示例,您的环境中发生了单线程执行,则可以插入模拟的工作负载,如下所示:

Stream.of(1, 2).parallel()

.peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))

.map(x -> {

LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(3));

return x;

})

.forEach(System.out::println);

然后,您可能还会看到,如果“ 每个元素的处理时间”足够长,则总执行时间将比“

元素数”ד每个元素的处理时间”短。

更新:误解可能是由Brian Goetz的误导性

陈述引起的:“在您的情况下,您的输入集太小而无法

分解”。

必须强调的是,这不是Stream API的常规属性,而是Map已使用的属性。AHashMap具有一个支持数组,并且

条目根据其哈希码分布在该数组中。这可能是分裂的数组的情况下ñ范围不会导致

被包含元素的均衡分裂,尤其是,如果只有两个。的实现者HashMap的Spliterator视为搜索

数组元素得到完美的平衡拆分是太贵了,不说拆分两个元素是不值得的。

由于HashMap的默认容量为16,并且示例仅包含两个元素,因此可以说地图太大了。简单地修复也可以

修复示例:

long start = System.nanoTime();

Map<String, Supplier<String>> input = new HashMap<>(2);

input.put("1", () -> {

System.out.println(Thread.currentThread());

LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));

return "a";

});

input.put("2", () -> {

System.out.println(Thread.currentThread());

LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));

return "b";

});

Map<String, String> results = input.keySet()

.parallelStream().collect(Collectors.toConcurrentMap(

key -> key,

key -> input.get(key).get()));

System.out.println("Time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start));

on my machine, it prints

Thread[main,5,main]

Thread[ForkJoinPool.commonPool-worker-1,5,main]

Time: 2058

结论是,如果您请求Stream实现,则无论输入大小如何,它始终尝试使用并行执行。但是,这取决于

输入的结构,可以将工作负载分配到工作线程的程度如何。事情甚至可能更糟,例如,如果您从文件中流式传输行。

如果您认为平衡拆分的好处值得进行复制,那么您也可以使用new ArrayList<>(input.keySet()).parallelStream()代替

input.keySet().parallelStream(),因为元素的分布ArrayList始终可以实现平衡的拆分。

以上是 有没有一种方法可以强制parallelStream()进行并行处理? 的全部内容, 来源链接: utcz.com/qa/417236.html

回到顶部