Python标准库multiprocessing基于进程的并行
源代码 Lib/multiprocessing/
概述¶
multiprocessing
是一个用与 threading
模块相似API的支持产生进程的包。 multiprocessing
包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing
模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。
multiprocessing
模块还引入了在 threading
模块中没有类似物的API。这方面的一个主要例子是 Pool
对象,它提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用 Pool
,
frommultiprocessingimportPooldeff(x):
returnx*x
if__name__=='__main__':
withPool(5)asp:
print(p.map(f,[1,2,3]))
将打印到标准输出
[1,4,9]
Process
类¶
在 multiprocessing
中,通过创建一个 Process
对象然后调用它的 start()
方法来生成进程。 Process
和 threading.Thread
API 相同。 一个简单的多进程程序示例是:
frommultiprocessingimportProcessdeff(name):
print('hello',name)
if__name__=='__main__':
p=Process(target=f,args=('bob',))
p.start()
p.join()
要显示所涉及的各个进程ID,这是一个扩展示例:
frommultiprocessingimportProcessimportos
definfo(title):
print(title)
print('module name:',__name__)
print('parent process:',os.getppid())
print('process id:',os.getpid())
deff(name):
info('function f')
print('hello',name)
if__name__=='__main__':
info('main line')
p=Process(target=f,args=('bob',))
p.start()
p.join()
为了解释为什么 if__name__=='__main__'
部分是必需的,请参见 编程指导。
上下文和启动方法¶
根据不同的平台, multiprocessing
支持三种启动进程的方法。这些 启动方法 有
- spawn
父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的
run()
方法所需的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。相对于使用 fork 或者 forkserver,使用这个方法启动进程相当慢。可在Unix和Windows上使用。 Windows上的默认设置。
- fork
父进程使用
os.fork()
来产生 Python 解释器分叉。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全分叉多线程进程是棘手的。只存在于Unix。Unix中的默认值。
- forkserver
程序启动并选择* forkserver * 启动方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。分叉服务器进程是单线程的,因此使用
os.fork()
是安全的。没有不必要的资源被继承。可在Unix平台上使用,支持通过Unix管道传递文件描述符。
在 3.4 版更改: spawn 在所有unix平台上添加,并且为一些unix平台添加了 forkserver 。子进程不再继承Windows上的所有上级进程可继承的句柄。
在Unix上使用 spawn 或 forkserver 启动方法也将启动一个 信号量跟踪器 进程,该进程跟踪由程序进程创建的未链接的命名信号量。当所有进程退出时,信号量跟踪器取消链接任何剩余的信号量。通常不应该有,但如果一个进程被信号杀死,可能会有一些“泄露”的信号量。(取消链接命名的信号量是一个严重的问题,因为系统只允许有限的数量,并且在下次重新启动之前它们不会自动取消链接。)
要选择一个启动方法,你应该在主模块的 if__name__=='__main__'
子句中调用 set_start_method()
。例如:
importmultiprocessingasmpdeffoo(q):
q.put('hello')
if__name__=='__main__':
mp.set_start_method('spawn')
q=mp.Queue()
p=mp.Process(target=foo,args=(q,))
p.start()
print(q.get())
p.join()
在程序中 set_start_method()
不应该被多次调用。
或者,你可以使用 get_context()
来获取上下文对象。上下文对象与多处理模块具有相同的API,并允许在同一程序中使用多个启动方法。:
importmultiprocessingasmpdeffoo(q):
q.put('hello')
if__name__=='__main__':
ctx=mp.get_context('spawn')
q=ctx.Queue()
p=ctx.Process(target=foo,args=(q,))
p.start()
print(q.get())
p.join()
请注意,对象在不同上下文创建的进程间可能并不兼容。 特别是,使用 fork 上下文创建的锁不能传递给使用 spawn 或 forkserver 启动方法启动的进程。
想要使用特定启动方法的库应该使用 get_context()
以避免干扰库用户的选择。
警告
'spawn'
和 'forkserver'
启动方法当前不能在Unix上和“冻结的”可执行内容一同使用(例如,有类似 PyInstaller 和 cx_Freeze 的包产生的二进制文件)。 'fork'
启动方法可以使用。
在进程之间交换对象¶
multiprocessing
支持进程之间的两种通信通道:
队列
Queue
类是一个近似queue.Queue
的克隆。 例如:frommultiprocessingimportProcess,Queuedeff(q):
q.put([42,None,'hello'])
if__name__=='__main__':
q=Queue()
p=Process(target=f,args=(q,))
p.start()
print(q.get())# prints "[42, None, 'hello']"
p.join()
队列是线程和进程安全的。
管道
Pipe()
函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:frommultiprocessingimportProcess,Pipedeff(conn):
conn.send([42,None,'hello'])
conn.close()
if__name__=='__main__':
parent_conn,child_conn=Pipe()
p=Process(target=f,args=(child_conn,))
p.start()
print(parent_conn.recv())# prints "[42, None, 'hello']"
p.join()
返回的两个连接对象
Pipe()
表示管道的两端。每个连接对象都有send()
和recv()
方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。
进程之间的同步¶
multiprocessing
包含来自 threading
的所有同步原语的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:
frommultiprocessingimportProcess,Lockdeff(l,i):
l.acquire()
try:
print('hello world',i)
finally:
l.release()
if__name__=='__main__':
lock=Lock()
fornuminrange(10):
Process(target=f,args=(lock,num)).start()
不使用来自不同进程的锁输出容易产生混淆。
在进程之间共享状态¶
如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。
但是,如果你真的需要使用一些共享数据,那么 multiprocessing
提供了两种方法。
共享内存
可以使用
Value
或Array
将数据存储在共享内存映射中。例如,以下代码:frommultiprocessingimportProcess,Value,Arraydeff(n,a):
n.value=3.1415927
foriinrange(len(a)):
a[i]=-a[i]
if__name__=='__main__':
num=Value('d',0.0)
arr=Array('i',range(10))
p=Process(target=f,args=(num,arr))
p.start()
p.join()
print(num.value)
print(arr[:])
将打印
3.1415927[0,-1,-2,-3,-4,-5,-6,-7,-8,-9]
创建
num
和arr
时使用的'd'
和'i'
参数是array
模块使用的类型的 typecode :'d'
表示双精度浮点数,'i'
表示有符号整数。这些共享对象将是进程和线程安全的。为了更灵活地使用共享内存,可以使用
multiprocessing.sharedctypes
模块,该模块支持创建从共享内存分配的任意ctypes对象。
服务器进程
由
Manager()
返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。
Manager()
返回的管理器支持类型:list
、dict
、Namespace
、Lock
、RLock
、Semaphore
、BoundedSemaphore
、Condition
、Event
、Barrier
、Queue
、Value
和Array
。例如frommultiprocessingimportProcess,Managerdeff(d,l):
d[1]='1'
d['2']=2
d[0.25]=None
l.reverse()
if__name__=='__main__':
withManager()asmanager:
d=manager.dict()
l=manager.list(range(10))
p=Process(target=f,args=(d,l))
p.start()
p.join()
print(d)
print(l)
将打印
{0.25:None,1:'1','2':2}[9,8,7,6,5,4,3,2,1,0]
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。
使用工作进程¶
Pool
类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。
例如
frommultiprocessingimportPool,TimeoutErrorimporttime
importos
deff(x):
returnx*x
if__name__=='__main__':
# start 4 worker processes
withPool(processes=4)aspool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f,range(10)))
# print same numbers in arbitrary order
foriinpool.imap_unordered(f,range(10)):
print(i)
# evaluate "f(20)" asynchronously
res=pool.apply_async(f,(20,))# runs in *only* one process
print(res.get(timeout=1))# prints "400"
# evaluate "os.getpid()" asynchronously
res=pool.apply_async(os.getpid,())# runs in *only* one process
print(res.get(timeout=1))# prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results=[pool.apply_async(os.getpid,())foriinrange(4)]
print([res.get(timeout=1)forresinmultiple_results])
# make a single worker sleep for 10 secs
res=pool.apply_async(time.sleep,(10,))
try:
print(res.get(timeout=1))
exceptTimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
请注意,池的方法只能由创建它的进程使用。
注解
该软件包中的功能要求子项可以导入 __main__
模块。这包含在 编程指导 中,但值得指出。这意味着一些示例,例如 multiprocessing.pool.Pool
示例在交互式解释器中不起作用。例如:
>>> frommultiprocessingimportPool>>> p=Pool(5)
>>> deff(x):
... returnx*x
...
>>> withp:
... p.map(f,[1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果你尝试这个,它实际上会以半随机的方式输出三个完整的回溯,然后你可能不得不以某种方式停止主进程。)
参考¶
multiprocessing
包大部分复制了 threading
模块的API。
Process
和异常¶
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶进程对象表示在单独进程中运行的活动。
Process
类等价于threading.Thread
。应始终使用关键字参数调用构造函数。 group 应该始终是
None
;它仅用于兼容threading.Thread
。 target 是由run()
方法调用的可调用对象。它默认为None
,意味着什么都没有被调用。 name 是进程名称(有关详细信息,请参阅name
)。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数字典。如果提供,则键参数 daemon 将进程daemon
标志设置为True
或False
。如果是None
(默认值),则该标志将从创建的进程继承。默认情况下,不会将任何参数传递给 target 。
如果子类重写构造函数,它必须确保它在对进程执行任何其他操作之前调用基类构造函数(
Process.__init__()
)。在 3.3 版更改: 加入 daemon 参数。
run
()¶表示进程活动的方法。
你可以在子类中重载此方法。标准
run()
方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),分别从 args 和 kwargs 参数中获取顺序和关键字参数。
start
()¶启动进程活动。
每个进程对象最多只能调用一次。它安排对象的
run()
方法在一个单独的进程中调用。
join
([timeout])¶如果可选参数 timeout 是
None
(默认值),则该方法将阻塞,直到调用join()
方法的进程终止。如果 timeout 是一个正数,它最多会阻塞 timeout 秒。请注意,如果进程终止或方法超时,则该方法返回None
。检查进程的exitcode
以确定它是否终止。一个进程可以合并多次。
进程无法并入自身,因为这会导致死锁。尝试在启动进程之前合并进程是错误的。
name
¶进程的名称。该名称是一个字符串,仅用于识别目的。它没有语义。可以为多个进程指定相同的名称。
初始名称由构造器设定。 如果没有为构造器提供显式名称,则会构造一个形式为 'Process-N1:N2:...:Nk' 的名称,其中每个 Nk 是其父亲的第 N 个孩子。
is_alive
()¶返回进程是否还活着。
粗略地说,从
start()
方法返回到子进程终止之前,进程对象仍处于活动状态。
daemon
¶进程的守护标志,一个布尔值。这必须在
start()
被调用之前设置。初始值继承自创建进程。
当进程退出时,它会尝试终止其所有守护进程子进程。
请注意,不允许在守护进程中创建子进程。这是因为当守护进程由于父进程退出而中断时,其子进程会变成孤儿进程。 另外,这些 不是 Unix 守护进程或服务,它们是正常进程,如果非守护进程已经退出,它们将被终止(并且不被合并)。
除了
threading.Thread
API ,Process
对象还支持以下属性和方法:pid
¶返回进程ID。在生成该进程之前,这将是
None
。
exitcode
¶的退子进程出代码。如果进程尚未终止,这将是
None
。负值 -N 表示孩子被信号 N 终止。
authkey
¶进程的身份验证密钥(字节字符串)。
当
multiprocessing
初始化时,主进程使用os.urandom()
分配一个随机字符串。当创建
Process
对象时,它将继承其父进程的身份验证密钥,尽管可以通过将authkey
设置为另一个字节字符串来更改。参见 认证密码 。
sentinel
¶系统对象的数字句柄,当进程结束时将变为 "ready" 。
如果要使用
multiprocessing.connection.wait()
一次等待多个事件,可以使用此值。否则调用join()
更简单。在Windows上,这是一个操作系统句柄,可以与
WaitForSingleObject
和WaitForMultipleObjects
系列API调用一起使用。在Unix上,这是一个文件描述符,可以使用来自select
模块的原语。3.3 新版功能.
terminate
()¶终止进程。 在Unix上,这是使用
SIGTERM
信号完成的;在Windows上使用TerminateProcess()
。 请注意,不会执行退出处理程序和finally子句等。请注意,进程的后代进程将不会被终止 —— 它们将简单地变成孤立的。
警告
如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并可能无法被其他进程使用。类似地,如果进程已获得锁或信号量等,则终止它可能导致其他进程死锁。
kill
()¶与
terminate()
相同,但在Unix上使用SIGKILL
信号。3.7 新版功能.
close
()¶关闭
Process
对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发ValueError
。一旦close()
成功返回,Process
对象的大多数其他方法和属性将引发ValueError
。3.7 新版功能.
注意
start()
、join()
、is_alive()
、terminate()
和exitcode
方法只能由创建进程对象的进程调用。Process
一些方法的示例用法:>>> importmultiprocessing,time,signal
>>> p=multiprocessing.Process(target=time.sleep,args=(1000,))
>>> print(p,p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p,p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p,p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode==-signal.SIGTERM
True
exception
multiprocessing.
ProcessError
¶所有
multiprocessing
异常的基类。
exception
multiprocessing.
BufferTooShort
¶当提供的缓冲区对象太小而无法读取消息时,
Connection.recv_bytes_into()
引发的异常。如果
e
是一个BufferTooShort
实例,那么e.args[0]
将把消息作为字节字符串给出。
exception
multiprocessing.
AuthenticationError
¶出现身份验证错误时引发。
exception
multiprocessing.
TimeoutError
¶有超时的方法超时时引发。
管道和队列¶
使用多进程时,一般使用消息机制实现进程间通信,尽可能避免使用同步原语,例如锁。
消息机制包含: Pipe()
(可以用于在两个进程间传递消息),以及队列(能够在多个生产者和消费者之间通信)。
Queue
, SimpleQueue
以及 JoinableQueue
都是多生产者,多消费者,并且实现了 FIFO 的队列类型,其表现与标准库中的 queue.Queue
类相似。 不同之处在于 Queue
缺少标准库的 queue.Queue
从 Python 2.5 开始引入的 task_done()
和 join()
方法。
如果你使用了 JoinableQueue
,那么你 必须 对每个已经移出队列的任务调用 JoinableQueue.task_done()
。 不然的话用于统计未完成任务的信号量最终会溢出并抛出异常。
另外还可以通过使用一个管理器对象创建一个共享队列,详见 数据管理器 。
注解
multiprocessing
使用了普通的 queue.Empty
和 queue.Full
异常去表示超时。 你需要从 queue
中导入它们,因为它们并不在 multiprocessing
的命名空间中。
注解
当一个对象被放入一个队列中时,这个对象首先会被一个后台线程用 pickle 序列化,并将序列化后的数据通过一个底层管道的管道传递到队列中。 这种做法会有点让人惊讶,但一般不会出现什么问题。 如果它们确实妨碍了你,你可以使用一个由管理器 manager 创建的队列替换它。
将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法
empty()
才会返回False
。而get_nowait()
可以不抛出queue.Empty
直接返回。如果有多个进程同时将对象放入队列,那么在队列的另一端接受到的对象可能是无序的。但是由同一个进程放入的多个对象的顺序在另一端输出时总是一样的。
警告
如果一个进程通过调用 Process.terminate()
或 os.kill()
在尝试使用 Queue
期间被终止了,那么队列中的数据很可能被破坏。 这可能导致其他进程在尝试使用该队列时遇到异常。
警告
正如刚才提到的,如果一个子进程将一些对象放进队列中 (并且它没有用 JoinableQueue.cancel_join_thread
方法),那么这个进程在所有缓冲区的对象被刷新进管道之前,是不会终止的。
这意味着,除非你确定所有放入队列中的对象都已经被消费了,否则如果你试图等待这个进程,你可能会陷入死锁中。相似地,如果该子进程不是后台进程,那么父进程可能在试图等待所有非后台进程退出时挂起。
注意用管理器创建的队列不存在这个问题,详见 编程指导 。
该 例子 展示了如何使用队列实现进程间通信。
multiprocessing.
Pipe
([duplex])¶返回一对
Connection`对象 ``(conn1,conn2)`
, 分别表示管道的两端。如果 duplex 被置为
True
(默认值),那么该管道是双向的。如果 duplex 被置为False
,那么该管道是单向的,即conn1
只能用于接收消息,而conn2
仅能用于发送消息。
class
multiprocessing.
Queue
([maxsize])¶返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中。
一旦超时,将抛出标准库
queue
模块中常见的异常queue.Empty
和queue.Full
。除了
task_done()
和join()
之外,Queue
实现了标准库类queue.Queue
中所有的方法。qsize
()¶返回队列的大致长度。由于多线程或者多进程的上下文,这个数字是不可靠的。
注意,在 Unix 平台上,例如 Mac OS X ,这个方法可能会抛出
NotImplementedError
异常,因为该平台没有实现sem_getvalue()
。
empty
()¶如果队列是空的,返回
True
,反之返回False
。 由于多线程或多进程的环境,该状态是不可靠的。
full
()¶如果队列是满的,返回
True
,反之返回False
。 由于多线程或多进程的环境,该状态是不可靠的。
put
(obj[, block[, timeout]])¶将 obj 放入队列。如果可选参数 block 是
True
(默认值) 而且 timeout 是None
(默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出queue.Full
异常。反之 (block 是False
时),仅当有可用缓冲槽时才放入对象,否则抛出queue.Full
异常 (在这种情形下 timeout 参数会被忽略)。
put_nowait
(obj)¶相当于
put(obj,False)
。
get
([block[, timeout]])¶从队列中取出并返回对象。如果可选参数 block 是
True
(默认值) 而且 timeout 是None
(默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出queue.Empty
异常。反之 (block 是False
时),仅当有可用对象能够取出时返回,否则抛出queue.Empty
异常 (在这种情形下 timeout 参数会被忽略)。
get_nowait
()¶相当于
get(False)
。
multiprocessing.Queue
类有一些在queue.Queue
类中没有出现的方法。这些方法在大多数情形下并不是必须的。close
()¶指示当前进程将不会再往队列中放入对象。一旦所有缓冲区中的数据被写入管道之后,后台的线程会退出。这个方法在队列被gc回收时会自动调用。
join_thread
()¶等待后台线程。这个方法仅在调用了
close()
方法之后可用。这会阻塞当前进程,直到后台线程退出,确保所有缓冲区中的数据都被写入管道中。默认情况下,如果一个不是队列创建者的进程试图退出,它会尝试等待这个队列的后台线程。这个进程可以使用
cancel_join_thread()
让join_thread()
方法什么都不做直接跳过。
cancel_join_thread
()¶防止
join_thread()
方法阻塞当前进程。具体而言,这防止进程退出时自动等待后台线程退出。详见join_thread()
。可能这个方法称为”
allow_exit_without_flush()
“ 会更好。这有可能会导致正在排队进入队列的数据丢失,大多数情况下你不需要用到这个方法,仅当你不关心底层管道中可能丢失的数据,只是希望进程能够马上退出时使用。
注解
该类的功能依赖于宿主操作系统具有可用的共享信号量实现。否则该类将被禁用,任何试图实例化一个
Queue
对象的操作都会抛出ImportError
异常,更多信息详见 bpo-3770 。后续说明的任何专用队列对象亦如此。
class
multiprocessing.
SimpleQueue
¶这是一个简化的
Queue
类的实现,很像带锁的Pipe
。empty
()¶如果队列为空返回
True
,否则返回False
。
get
()¶从队列中移出并返回一个对象。
put
(item)¶将 item 放入队列。
class
multiprocessing.
JoinableQueue
([maxsize])¶JoinableQueue
类是Queue
的子类,额外添加了task_done()
和join()
方法。task_done
()¶指出之前进入队列的任务已经完成。由队列的消费者进程使用。对于每次调用
get()
获取的任务,执行完成后调用task_done()
告诉队列该任务已经处理完成。如果
join()
方法正在阻塞之中,该方法会在所有对象都被处理完的时候返回 (即对之前使用put()
放进队列中的所有对象都已经返回了对应的task_done()
) 。如果被调用的次数多于放入队列中的项目数量,将引发
ValueError
异常 。
join
()¶阻塞至队列中所有的元素都被接收和处理完毕。
当条目添加到队列的时候,未完成任务的计数就会增加。每当消费者进程调用
task_done()
表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候,join()
阻塞被解除。
杂项¶
multiprocessing.
active_children
()¶返回当前进程存活的子进程的列表。
调用该方法有“等待”已经结束的进程的副作用。
multiprocessing.
cpu_count
()¶返回系统的CPU数量。
该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由
len(os.sched_getaffinity(0))
方法获得。可能引发
NotImplementedError
。参见
os.cpu_count()
multiprocessing.
current_process
()¶返回与当前进程相对应的
Process
对象。和
threading.current_thread()
相同。
multiprocessing.
freeze_support
()¶为使用了
multiprocessing
的程序,提供冻结以产生 Windows 可执行文件的支持。(在 py2exe, PyInstaller 和 cx_Freeze 上测试通过)需要在 main 模块的
if__name__=='__main__'
该行之后马上调用该函数。例如:frommultiprocessingimportProcess,freeze_support
deff():
print('hello world!')
if__name__=='__main__':
freeze_support()
Process(target=f).start()
如果没有调用
freeze_support()
在尝试运行被冻结的可执行文件时会抛出RuntimeError
异常。对
freeze_support()
的调用在非 Windows 平台上是无效的。如果该模块在 Windows 平台的 Python 解释器中正常运行 (该程序没有被冻结), 调用``freeze_support()`` 也是无效的。
multiprocessing.
get_all_start_methods
()¶返回支持的启动方法的列表,该列表的首项即为默认选项。可能的启动方法有
'fork'
,'spawn'
和``'forkserver'。在Windows中,只有 ``'spawn'
是可用的。Unix平台总是支持``'fork'`` 和``'spawn',且``'fork'
是默认值。3.4 新版功能.
multiprocessing.
get_context
(method=None)¶返回一个 Context 对象。该对象具有和
multiprocessing
模块相同的API。如果 method 设置成
None
那么将返回默认上下文对象。否则 method 应该是'fork'
,'spawn'
,'forkserver'
。 如果指定的启动方法不存在,将抛出ValueError
异常。3.4 新版功能.
multiprocessing.
get_start_method
(allow_none=False)¶返回启动进程时使用的启动方法名。
如果启动方法已经固定,并且 allow_none 被设置成 False ,那么启动方法将被固定为默认的启动方法,并且返回其方法名。如果启动方法没有设定,并且 allow_none 被设置成 True ,那么将返回
None
。返回值将为
'fork'
,'spawn'
,'forkserver'
或者None
。'fork'``是Unix的默认值, ``'spawn'
是 Windows 的默认值。3.4 新版功能.
multiprocessing.
set_executable
()¶设置在启动子进程时使用的 Python 解释器路径。 ( 默认使用
sys.executable
) 嵌入式编程人员可能需要这样做:set_executable(os.path.join(sys.exec_prefix,'pythonw.exe'))
以使他们可以创建子进程。
在 3.4 版更改: 现在在 Unix 平台上使用
'spawn'
启动方法时支持调用该方法。
multiprocessing.
set_start_method
(method)¶设置启动子进程的方法。 method 可以是
'fork'
,'spawn'
或者'forkserver'
。注意这最多只能调用一次,并且需要藏在 main 模块中,由
if__name__=='__main__'
保护着。3.4 新版功能.
注解
multiprocessing
并没有包含类似 threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, 或者 threading.local
的方法和类。
连接对象(Connection)¶
Connection 对象允许收发可以序列化的对象或字符串。它们可以看作面向消息的连接套接字。
通常使用 Pipe
创建 Connection 对象。详见 : 监听者及客户端.
class
multiprocessing.connection.
Connection
¶send
(obj)¶将一个对象发送到连接的另一端,可以用
recv()
读取。发送的对象必须是可以序列化的,过大的对象 ( 接近 32MiB+ ,这个值取决于操作系统 ) 有可能引发
ValueError
异常。
recv
()¶返回一个由另一端使用
send()
发送的对象。该方法会一直阻塞直到接收到对象。 如果对端关闭了连接或者没有东西可接收,将抛出EOFError
异常。
fileno
()¶返回由连接对象使用的描述符或者句柄。
close
()¶关闭连接对象。
当连接对象被垃圾回收时会自动调用。
poll
([timeout])¶返回连接对象中是否有可以读取的数据。
如果未指定 timeout ,此方法会马上返回。如果 timeout 是一个数字,则指定了最大阻塞的秒数。如果 timeout 是
None
,那么将一直等待,不会超时。注意通过使用
multiprocessing.connection.wait()
可以一次轮询多个连接对象。
send_bytes
(buffer[, offset[, size]])¶从一个 bytes-like object (字节类对象)对象中取出字节数组并作为一条完整消息发送。
如果由 offset 给定了在 buffer 中读取数据的位置。 如果给定了 size ,那么将会从缓冲区中读取多个字节。 过大的缓冲区 ( 接近 32MiB+ ,此值依赖于操作系统 ) 有可能引发
ValueError
异常。
recv_bytes
([maxlength])¶以字符串形式返回一条从连接对象另一端发送过来的字节数据。此方法在接收到数据前将一直阻塞。 如果连接对象被对端关闭或者没有数据可读取,将抛出
EOFError
异常。如果给定了 maxlength 并且消息长于 maxlength 那么将抛出
OSError
并且该连接对象将不再可读。在 3.3 版更改: 曾经该函数抛出
IOError
,现在这是OSError
的别名。
recv_bytes_into
(buffer[, offset])¶将一条完整的字节数据消息读入 buffer 中并返回消息的字节数。 此方法在接收到数据前将一直阻塞。 如果连接对象被对端关闭或者没有数据可读取,将抛出
EOFError
异常。buffer must be a writable bytes-like object. If
offset is given then the message will be written into the buffer from
that position. Offset must be a non-negative integer less than the
length of buffer (in bytes).
如果缓冲区太小,则将引发
BufferTooShort
异常,并且完整的消息将会存放在异常实例e
的e.args[0]
中。
在 3.3 版更改: 现在连接对象自身可以通过
Connection.send()
和Connection.recv()
在进程之间传递。3.3 新版功能: 连接对象现已支持上下文管理协议 -- 参见 see 上下文管理器类型 。
__enter__()
返回连接对象,__exit__()
会调用close()
。
例如:
>>> frommultiprocessingimportPipe>>> a,b=Pipe()
>>> a.send([1,'hello',None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> importarray
>>> arr1=array.array('i',range(5))
>>> arr2=array.array('i',[0]*10)
>>> a.send_bytes(arr1)
>>> count=b.recv_bytes_into(arr2)
>>> assertcount==len(arr1)*arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()
方法会自动解封它收到的数据,除非你能够信任发送消息的进程,否则此处可能有安全风险。
因此, 除非连接对象是由 Pipe()
产生的,否则你应该仅在使用了某种认证手段之后才使用 recv()
和 send()
方法。 参考 认证密码。
警告
如果一个进程在试图读写管道时被终止了,那么管道中的数据很可能是不完整的,因为此时可能无法确定消息的边界。
同步原语¶
通常来说同步原语在多进程环境中并不像它们在多线程环境中那么必要。参考 threading
模块的文档。
注意可以使用管理器对象创建同步原语,参考 数据管理器 。
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶类似
threading.Barrier
的栅栏对象。3.3 新版功能.
class
multiprocessing.
BoundedSemaphore
([value])¶非常类似
threading.BoundedSemaphore
的有界信号量对象。一个小小的不同在于,它的
acquire
方法的第一个参数名是和Lock.acquire()
一样的 block 。注解
在 Mac OS X 平台上, 该对象于
Semaphore
不同在于sem_getvalue()
方法并没有在该平台上实现。
class
multiprocessing.
Condition
([lock])¶条件变量:
threading.Condition
的别名。指定的 lock 参数应该是
multiprocessing
模块中的Lock
或者RLock
对象。在 3.3 版更改: 新增了
wait_for()
方法。
class
multiprocessing.
Event
¶A clone of
threading.Event
.
class
multiprocessing.
Lock
¶原始锁(非递归锁)对象,类似于
threading.Lock
。一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。任何进程或线程都可以释放锁。除非另有说明,否则multiprocessing.Lock
用于进程或者线程的概念和行为都和threading.Lock
一致。注意
Lock
实际上是一个工厂函数。它返回由默认上下文初始化的multiprocessing.synchronize.Lock
对象。Lock
supports the context manager protocol and thus may beused in
with
statements.acquire
(block=True, timeout=None)¶可以阻塞或非阻塞地获得锁。
如果 block 参数被设为
True
( 默认值 ) , 对该方法的调用在锁处于释放状态之前都会阻塞,然后将锁设置为锁住状态并返回True
。需要注意的是第一个参数名与threading.Lock.acquire()
的不同。如果 block 参数被设置成
False
,方法的调用将不会阻塞。 如果锁当前处于锁住状态,将返回False
; 否则将锁设置成锁住状态,并返回True
。当 timeout 是一个正浮点数时,会在等待锁的过程中最多阻塞等待 timeout 秒,当 timeout 是负数时,效果和 timeout 为0时一样,当 timeout 是
None
(默认值)时,等待时间是无限长。需要注意的是,对于 timeout 是负数和None
的情况, 其行为与threading.Lock.acquire()
是不一样的。当 block 参数为False
时, timeout 并没有实际用处,会直接忽略, 当 block 参数为True
时,函数会在拿到锁后返回True
或者 超时没拿到锁后返回False
。
release
()¶释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。
其行为与
threading.Lock.release()
一样,只不过当尝试释放一个没有被持有的锁时,会抛出ValueError
异常。
class
multiprocessing.
RLock
¶递归锁对象: 类似于
threading.RLock
。递归锁必须由持有线程、进程亲自释放。如果某个进程或者线程拿到了递归锁,这个进程或者线程可以再次拿到这个锁而不需要等待。但是这个进程或者线程的拿锁操作和释放锁操作的次数必须相同。注意
RLock
是一个工厂函数,调用后返回一个使用默认 context 初始化的multiprocessing.synchronize.RLock
对象。RLock
支持 context manager 协议,因此可在with
语句内使用。acquire
(block=True, timeout=None)¶可以阻塞或非阻塞地获得锁。
当 block 设置为
True
时,会一直阻塞直到锁处于空闲状态(没有被任何进程、线程拥有),除非当前进程/线程已经拥有了这把锁。然后当前进程会持有这把锁(在锁没有被持有者的情况下),锁内的递归等级加一,并返回True
. 注意, 这个函数第一个参数和threading.RLock.acquire()
有几个不同点,包括参数名本身。当 block 参数是
False
, 将不会阻塞,如果此时锁被其他进程或者线程持有,当前进程、线程获取锁操作失败,锁的递归等级也不会改变,函数返回False
, 如果当前锁已经处于释放状态,则当前进程、线程则会拿到锁,并且锁内的递归等级加一,函数返回True
。timeout 参数的使用方法及行为与
Lock.acquire()
一样。但是要注意 timeout 的其中一些行为和threading.RLock.acquire()
中实现的行为是不同的。
release
()¶释放锁,使锁内的递归等级减一。如果释放后锁内的递归等级降低为0,则会重置锁的状态为释放状态(即没有被任何进程、线程持有),重置后如果有有其他进程和线程在等待这把锁,他们中的一个会获得这个锁而继续运行。如果释放后锁内的递归等级还没到达0,则这个锁仍将保持未释放状态且当前进程和线程仍然是持有者。
只有当前进程或线程是锁的持有者时,才允许调用这个方法。如果当前进程或线程不是这个锁的拥有者,或者这个锁处于已释放的状态(即没有任何拥有者),调用这个方法会抛出
AssertionError
异常。注意这里抛出的异常类型和threading.RLock.release()
中实现的行为不一样。
class
multiprocessing.
Semaphore
([value])¶一种信号量对象: 类似于
threading.Semaphore
.一个小小的不同在于,它的
acquire
方法的第一个参数名是和Lock.acquire()
一样的 block 。
注解
在 Mac OS X 上,不支持 sem_timedwait
,所以,使用调用 acquire()
时如果使用 timeout 参数,会通过循环sleep来模拟timeout的行为。
注解
假如信号 SIGINT 是来自于 Ctrl-C ,并且主线程被 BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
或 Condition.wait()
阻塞,则调用会立即中断同时抛出 KeyboardInterrupt
异常。
这和 threading
的行为不同,此模块中当执行对应的阻塞式调用时,SIGINT 会被忽略。
注解
这个库的某些功能依赖于宿主机系统的共享信号量,如果系统没有这个特性, multiprocessing.synchronize
会被禁用,尝试导入这个包会引发 ImportError
异常,详细信息请查看 bpo-3770 。