Python多进程multiprocessing用法实例分析

本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:

mutilprocess简介

像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

简单的创建进程:

import multiprocessing

def worker(num):

"""thread worker function"""

print 'Worker:', num

return

if __name__ == '__main__':

jobs = []

for i in range(5):

p = multiprocessing.Process(target=worker, args=(i,))

jobs.append(p)

p.start()

确定当前的进程,即是给进程命名,方便标识区分,跟踪

import multiprocessing

import time

def worker():

name = multiprocessing.current_process().name

print name, 'Starting'

time.sleep(2)

print name, 'Exiting'

def my_service():

name = multiprocessing.current_process().name

print name, 'Starting'

time.sleep(3)

print name, 'Exiting'

if __name__ == '__main__':

service = multiprocessing.Process(name='my_service',

target=my_service)

worker_1 = multiprocessing.Process(name='worker 1',

target=worker)

worker_2 = multiprocessing.Process(target=worker) # default name

worker_1.start()

worker_2.start()

service.start()

守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了

守护进程:

import multiprocessing

import time

import sys

def daemon():

name = multiprocessing.current_process().name

print 'Starting:', name

time.sleep(2)

print 'Exiting :', name

def non_daemon():

name = multiprocessing.current_process().name

print 'Starting:', name

print 'Exiting :', name

if __name__ == '__main__':

d = multiprocessing.Process(name='daemon',

target=daemon)

d.daemon = True

n = multiprocessing.Process(name='non-daemon',

target=non_daemon)

n.daemon = False

d.start()

n.start()

d.join(1)

print 'd.is_alive()', d.is_alive()

n.join()

最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态

终止进程:

import multiprocessing

import time

def slow_worker():

print 'Starting worker'

time.sleep(0.1)

print 'Finished worker'

if __name__ == '__main__':

p = multiprocessing.Process(target=slow_worker)

print 'BEFORE:', p, p.is_alive()

p.start()

print 'DURING:', p, p.is_alive()

p.terminate()

print 'TERMINATED:', p, p.is_alive()

p.join()

print 'JOINED:', p, p.is_alive()

①. == 0 未生成任何错误 

②. 0 进程有一个错误,并以该错误码退出

③. < 0 进程由一个-1 * exitcode信号结束

进程的退出状态:

import multiprocessing

import sys

import time

def exit_error():

sys.exit(1)

def exit_ok():

return

def return_value():

return 1

def raises():

raise RuntimeError('There was an error!')

def terminated():

time.sleep(3)

if __name__ == '__main__':

jobs = []

for f in [exit_error, exit_ok, return_value, raises, terminated]:

print 'Starting process for', f.func_name

j = multiprocessing.Process(target=f, name=f.func_name)

jobs.append(j)

j.start()

jobs[-1].terminate()

for j in jobs:

j.join()

print '%15s.exitcode = %s' % (j.name, j.exitcode)

方便的调试,可以用logging

日志:

import multiprocessing

import logging

import sys

def worker():

print 'Doing some work'

sys.stdout.flush()

if __name__ == '__main__':

multiprocessing.log_to_stderr()

logger = multiprocessing.get_logger()

logger.setLevel(logging.INFO)

p = multiprocessing.Process(target=worker)

p.start()

p.join()

利用class来创建进程,定制子类

派生进程:

import multiprocessing

class Worker(multiprocessing.Process):

def run(self):

print 'In %s' % self.name

return

if __name__ == '__main__':

jobs = []

for i in range(5):

p = Worker()

jobs.append(p)

p.start()

for j in jobs:

j.join()

python进程间传递消息:

import multiprocessing

class MyFancyClass(object):

def __init__(self, name):

self.name = name

def do_something(self):

proc_name = multiprocessing.current_process().name

print 'Doing something fancy in %s for %s!' % \

(proc_name, self.name)

def worker(q):

obj = q.get()

obj.do_something()

if __name__ == '__main__':

queue = multiprocessing.Queue()

p = multiprocessing.Process(target=worker, args=(queue,))

p.start()

queue.put(MyFancyClass('Fancy Dan'))

# Wait for the worker to finish

queue.close()

queue.join_thread()

p.join()

import multiprocessing

import time

class Consumer(multiprocessing.Process):

def __init__(self, task_queue, result_queue):

multiprocessing.Process.__init__(self)

self.task_queue = task_queue

self.result_queue = result_queue

def run(self):

proc_name = self.name

while True:

next_task = self.task_queue.get()

if next_task is None:

# Poison pill means shutdown

print '%s: Exiting' % proc_name

self.task_queue.task_done()

break

print '%s: %s' % (proc_name, next_task)

answer = next_task()

self.task_queue.task_done()

self.result_queue.put(answer)

return

class Task(object):

def __init__(self, a, b):

self.a = a

self.b = b

def __call__(self):

time.sleep(0.1) # pretend to take some time to do the work

return '%s * %s = %s' % (self.a, self.b, self.a * self.b)

def __str__(self):

return '%s * %s' % (self.a, self.b)

if __name__ == '__main__':

# Establish communication queues

tasks = multiprocessing.JoinableQueue()

results = multiprocessing.Queue()

# Start consumers

num_consumers = multiprocessing.cpu_count() * 2

print 'Creating %d consumers' % num_consumers

consumers = [ Consumer(tasks, results)

for i in xrange(num_consumers) ]

for w in consumers:

w.start()

# Enqueue jobs

num_jobs = 10

for i in xrange(num_jobs):

tasks.put(Task(i, i))

# Add a poison pill for each consumer

for i in xrange(num_consumers):

tasks.put(None)

# Wait for all of the tasks to finish

tasks.join()

# Start printing results

while num_jobs:

result = results.get()

print 'Result:', result

num_jobs -= 1

Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。

进程间信号传递:

import multiprocessing

import time

def wait_for_event(e):

"""Wait for the event to be set before doing anything"""

print 'wait_for_event: starting'

e.wait()

print 'wait_for_event: e.is_set()->', e.is_set()

def wait_for_event_timeout(e, t):

"""Wait t seconds and then timeout"""

print 'wait_for_event_timeout: starting'

e.wait(t)

print 'wait_for_event_timeout: e.is_set()->', e.is_set()

if __name__ == '__main__':

e = multiprocessing.Event()

w1 = multiprocessing.Process(name='block',

target=wait_for_event,

args=(e,))

w1.start()

w2 = multiprocessing.Process(name='nonblock',

target=wait_for_event_timeout,

args=(e, 2))

w2.start()

print 'main: waiting before calling Event.set()'

time.sleep(3)

e.set()

print 'main: event is set'

Python多进程,一般的情况是Queue来传递。

Queue:

from multiprocessing import Process, Queue

def f(q):

q.put([42, None, 'hello'])

if __name__ == '__main__':

q = Queue()

p = Process(target=f, args=(q,))

p.start()

print q.get() # prints "[42, None, 'hello']"

p.join()

多线程优先队列Queue:

import Queue

import threading

import time

exitFlag = 0

class myThread (threading.Thread):

def __init__(self, threadID, name, q):

threading.Thread.__init__(self)

self.threadID = threadID

self.name = name

self.q = q

def run(self):

print "Starting " + self.name

process_data(self.name, self.q)

print "Exiting " + self.name

def process_data(threadName, q):

while not exitFlag:

queueLock.acquire()

if not workQueue.empty():

data = q.get()

queueLock.release()

print "%s processing %s" % (threadName, data)

else:

queueLock.release()

time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]

nameList = ["One", "Two", "Three", "Four", "Five"]

queueLock = threading.Lock()

workQueue = Queue.Queue(10)

threads = []

threadID = 1

# Create new threads

for tName in threadList:

thread = myThread(threadID, tName, workQueue)

thread.start()

threads.append(thread)

threadID += 1

# Fill the queue

queueLock.acquire()

for word in nameList:

workQueue.put(word)

queueLock.release()

# Wait for queue to empty

while not workQueue.empty():

pass

# Notify threads it's time to exit

exitFlag = 1

# Wait for all threads to complete

for t in threads:

t.join()

print "Exiting Main Thread"

多进程使用Queue通信的例子

import time

from multiprocessing import Process,Queue

MSG_QUEUE = Queue(5)

def startA(msgQueue):

while True:

if msgQueue.empty() > 0:

print ('queue is empty %d' % (msgQueue.qsize()))

else:

msg = msgQueue.get()

print( 'get msg %s' % (msg,))

time.sleep(1)

def startB(msgQueue):

while True:

msgQueue.put('hello world')

print( 'put hello world queue size is %d' % (msgQueue.qsize(),))

time.sleep(3)

if __name__ == '__main__':

processA = Process(target=startA,args=(MSG_QUEUE,))

processB = Process(target=startB,args=(MSG_QUEUE,))

processA.start()

print( 'processA start..')

主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《Python Socket编程技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》

希望本文所述对大家Python程序设计有所帮助。

以上是 Python多进程multiprocessing用法实例分析 的全部内容, 来源链接: utcz.com/z/313740.html

回到顶部