集成multiprocessing.Process与concurrent.future._base.Future

我有创建子进程,使用未来接收结果,然后杀死其中一些需要时的要求。集成multiprocessing.Process与concurrent.future._base.Future

为此,我分类了multiprocessing.Process类,并从start()方法返回Future对象。

问题是我无法在cb()函数中接收到结果,因为它永远不会被调用。

请帮助/建议如果这可以用其他方式完成,或者我在当前实现中丢失了什么?

以下是我目前的做法

from multiprocessing import Process, Queue 

from concurrent.futures import _base

import threading

from time import sleep

def foo(x,q):

print('result {}'.format(x*x))

result = x*x

sleep(5)

q.put(result)

class MyProcess(Process):

def __init__(self, target, args):

super().__init__()

self.target = target

self.args = args

self.f = _base.Future()

def run(self):

q = Queue()

worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,)))

worker_thread.start()

r = q.get(block=True)

print('setting result {}'.format(r))

self.f.set_result(result=r)

print('done setting result')

def start(self):

f = _base.Future()

run_thread = threading.Thread(target=self.run)

run_thread.start()

return f

def cb(future):

print('received result in callback {}'.format(future))

def main():

p1 = MyProcess(target=foo, args=(2,))

f = p1.start()

f.add_done_callback(fn=cb)

sleep(10)

if __name__ == '__main__':

main()

print('Main thread dying')

回答:

在你开始你的方法创建一个新的未来,你再回来。这是你设定结果的不同未来,这个未来根本就没有用过。尝试:

def start(self): 

run_thread = threading.Thread(target=self.run)

run_thread.start()

return self.f

但是,您的代码存在更多问题。您覆盖该进程的start方法,将其替换为工作线程上的执行,因此实际上绕过了多处理。另外,您不应该导入_base模块,即从前导下划线中看到的实现细节。你应该导入concurrent.futures.Future(这是相同的类,但通过公共API)。

这确实使用多:

from multiprocessing import Process, Queue 

from concurrent.futures import Future

import threading

from time import sleep

def foo(x,q):

print('result {}'.format(x*x))

result = x*x

sleep(5)

q.put(result)

class MyProcess(Process):

def __init__(self, target, args):

super().__init__()

self.target = target

self.args = args

self.f = Future()

def run(self):

q = Queue()

worker_thread = threading.Thread(target=self.target, args=(self.args+ (q,)))

worker_thread.start()

r = q.get(block=True)

print('setting result {}'.format(r))

self.f.set_result(result=r)

print('done setting result')

def cb(future):

print('received result in callback {}: {}'.format(future, future.result()))

def main():

p1 = MyProcess(target=foo, args=(2,))

p1.f.add_done_callback(fn=cb)

p1.start()

p1.join()

sleep(10)

if __name__ == '__main__':

main()

print('Main thread dying')

而你在一个新的进程,现在已经是,产卵一个工作线程来执行你的目标函数真的不应该是必要的,你可以只执行你的目标函数直接代替。如果目标函数引发一个你不知道的异常,你的回调只会在成功时被调用。所以如果你解决了这个问题,那么你只需要:

from multiprocessing import Process 

from concurrent.futures import Future

import threading

from time import sleep

def foo(x):

print('result {}'.format(x*x))

result = x*x

sleep(5)

return result

class MyProcess(Process):

def __init__(self, target, args):

super().__init__()

self.target = target

self.args = args

self.f = Future()

def run(self):

try:

r = self.target(*self.args)

print('setting result {}'.format(r))

self.f.set_result(result=r)

print('done setting result')

except Exception as ex:

self.f.set_exception(ex)

def cb(future):

print('received result in callback {}: {}'.format(future, future.result()))

def main():

p1 = MyProcess(target=foo, args=(2,))

p1.f.add_done_callback(fn=cb)

p1.start()

p1.join()

sleep(10)

if __name__ == '__main__':

main()

print('Main thread dying')

这基本上就是ProcessPoolExecutor所做的。

以上是 集成multiprocessing.Process与concurrent.future._base.Future 的全部内容, 来源链接: utcz.com/qa/261691.html

回到顶部