有没有一种方法可以强制parallelStream()进行并行处理?
如果输入大小太小,则库会自动序列化 流中地图的执行,但是这种自动化不会并且也不会考虑地图操作的繁重程度。有没有办法
强制parallelStream()实际并行化CPU重映射?
回答:
似乎存在根本的误解。链接的“问答”讨论了由于OP没有 看到预期的加速,流显然不能并行工作。结论是,有没有好处在
并行处理工作负载是否太小,不,有一个自动回退到顺序执行。
实际上是相反的。如果您请求并行处理,即使实际上降低了性能,您也会得到并行处理。在这种情况下,实现不会切换到可能更有效的顺序执行。
因此,如果您确信每个元素的工作量都足够高,足以证明使用并行执行是合理的,而不管元素的数量如何,那么您可以简单地请求并行执行。
可以很容易地证明:
Stream.of(1, 2).parallel() .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
.forEach(System.out::println);
On Ideone, it prints
processing 2 in Thread[main,5,main]2
processing 1 in Thread[ForkJoinPool.commonPool-worker-1,5,main]
1
但是消息的顺序和详细信息可能会有所不同。甚至有可能在某些环境中,两个任务可能恰好由同一线程执行,
如果它可以在另一个线程开始将其提起之前将第二个任务进行加固。但是,当然,如果任务足够昂贵,就不会发生这种情况。在重要的一点是,总体工作量已经分裂,排队要由其他工作线程可能回升。
如果对于
上面的简单示例,您的环境中发生了单线程执行,则可以插入模拟的工作负载,如下所示:
Stream.of(1, 2).parallel() .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
.map(x -> {
LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(3));
return x;
})
.forEach(System.out::println);
然后,您可能还会看到,如果“ 每个元素的处理时间”足够长,则总执行时间将比“
元素数”ד每个元素的处理时间”短。
更新:误解可能是由Brian Goetz的误导性
陈述引起的:“在您的情况下,您的输入集太小而无法
分解”。
必须强调的是,这不是Stream API的常规属性,而是Map已使用的属性。AHashMap具有一个支持数组,并且
条目根据其哈希码分布在该数组中。这可能是分裂的数组的情况下ñ范围不会导致
被包含元素的均衡分裂,尤其是,如果只有两个。的实现者HashMap的Spliterator视为搜索
数组元素得到完美的平衡拆分是太贵了,不说拆分两个元素是不值得的。
由于HashMap的默认容量为16,并且示例仅包含两个元素,因此可以说地图太大了。简单地修复也可以
修复示例:
long start = System.nanoTime();Map<String, Supplier<String>> input = new HashMap<>(2);
input.put("1", () -> {
System.out.println(Thread.currentThread());
LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));
return "a";
});
input.put("2", () -> {
System.out.println(Thread.currentThread());
LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));
return "b";
});
Map<String, String> results = input.keySet()
.parallelStream().collect(Collectors.toConcurrentMap(
key -> key,
key -> input.get(key).get()));
System.out.println("Time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start));
on my machine, it prints
Thread[main,5,main]Thread[ForkJoinPool.commonPool-worker-1,5,main]
Time: 2058
结论是,如果您请求Stream实现,则无论输入大小如何,它始终尝试使用并行执行。但是,这取决于
输入的结构,可以将工作负载分配到工作线程的程度如何。事情甚至可能更糟,例如,如果您从文件中流式传输行。
如果您认为平衡拆分的好处值得进行复制,那么您也可以使用new ArrayList<>(input.keySet()).parallelStream()
代替
input.keySet().parallelStream()
,因为元素的分布ArrayList
始终可以实现平衡的拆分。
以上是 有没有一种方法可以强制parallelStream()进行并行处理? 的全部内容, 来源链接: utcz.com/qa/417236.html