如何在python numpy中并行化求和计算?

我正在尝试计算总和,并且在并行化代码方面遇到困难。我试图并行化的计算有点复杂(它同时使用了numpy数组和scipy稀疏矩阵)。它吐出一个numpy数组,我想对大约1000个计算中的输出数组求和。理想情况下,我将在所有迭代中保持运行总和。但是,我还无法弄清楚该如何做。

到目前为止,我已经尝试将joblib的Parallel函数和pool.map函数与python的多处理程序包一起使用。对于这两种情况,我都使用一个内部函数返回一个numpy数组。这些函数返回一个列表,我将其转换为numpy数组,然后求和。

但是,在joblib

Parallel函数完成所有迭代之后,主程序将永远不会继续运行(看起来原始作业处于挂起状态,使用0%CPU)。当我使用pool.map时,所有迭代完成后都会出现内存错误。

有没有一种方法可以简单地并行化运行中的数组总和?

:目标是除了并行执行以下操作。

def summers(num_iters):

sumArr = np.zeros((1,512*512)) #initialize sum

for index in range(num_iters):

sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array

return sumArr

回答:

我想出了如何使用多处理,apply_async和回调将数组的总和并行化,所以我将其发布在这里供其他人使用。我使用的示例页面并行的Python的总和回调类,虽然我没有真正使用该程序包实施。不过,它给了我使用回调的想法。这是我最终使用的简化代码,它可以完成我想要的操作。

import multiprocessing

import numpy as np

import thread

class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments)

def __init__(self):

self.value = np.zeros((1,512*512)) #this is the initialization of the sum

self.lock = thread.allocate_lock()

self.count = 0

def add(self,value):

self.count += 1

self.lock.acquire() #lock so sum is correct if two processes return at same time

self.value += value #the actual summation

self.lock.release()

def computation(index):

array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes

return array1

def summers(num_iters):

pool = multiprocessing.Pool(processes=8)

sumArr = Sum() #create an instance of callback class and zero the sum

for index in range(num_iters):

singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add)

pool.close()

pool.join() #waits for all the processes to finish

return sumArr.value

我还可以使用并行映射来完成此工作,这在另一个答案中建议。我已经尝试过了,但是没有正确实现。两种方法都有效,我认为这个答案很好地说明了使用哪种方法(映射或apply.async)的问题。对于地图版本,您无需定义Sum类,summers函数将变为

def summers(num_iters):

pool = multiprocessing.Pool(processes=8)

outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these

sumArr = np.zeros((1,512*512)) #but I do to make sure I have the memory

outputArr = np.array(pool.map(computation, range(num_iters)))

sumArr = outputArr.sum(0)

pool.close() #not sure if this is still needed since map waits for all iterations

return sumArr

以上是 如何在python numpy中并行化求和计算? 的全部内容, 来源链接: utcz.com/qa/400753.html

回到顶部