04_进程池

python

1.为什么用进程

    1.在需要频繁的创建删除较多进程的情况下,导致计算机资源消耗过多
    2.进程池则是创建指定进程数量等待执行事件,避免了不必要的创建和销毁过程

2.进程池的使用步骤

    1.创建进程池,在池内放入适量的进程,将事件加入进程池的等待队列


    2.使用进程池中的进程不断处理事件,所有事件处理后回收关闭进程池

3.语法概述

from multiprocessing import Pool

pool = Pool(processes=4) # 创建指定进程数量进程池并返回进程池对象

# 异步方式将事件放入进程池执行,返回一个对象该对象

# callback: 回调函数,每当进程池中的进程处理完任务了,返回的结果交给回调函数,由回调函数进一步处理,回调函数只有异步时才有

ret = pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

参数:

func要执行的事件函数,可以通过返回值对象的 ret.get() 方法得到func函数的返回值

args: 位置参数元组,要给函数传递的参数

kwargs: 键值对参数字典,要给函数传递的参数

callback: 进程的任务函数返回值被当做回调函数的形参接收到,以此进行进一步的处理操作,回调函数是主进程调用的

error_callback: 错误时主进程执行的回调函数

pool.close() # 关闭进程池,使其无法加入新的事件

pool.join() # 阻塞等待进程池退出(当所有事情处理完毕后)

pool.apply() # 用法和 pool.apply_async() 一样,但是没有返回值,异步方式将事件放入进程池顺序执行,一个事件结束再执行另一个事件

func_list = pool.map(func, iter) # 类似于内建函数map,将第二个参数的迭代数传递给第一个参数的函数执行,同时兼容了使用进程池执行

# 示例:

from multiprocessing import Pool

import time

def fun(num):

time.sleep(1)

return num * num

test = [1, 2, 3, 4, 5, 6]

pool = Pool(3)

r = pool.map(fun, test) # 返回fun函数的返回值列表

# 上面折行代码等同于以下注释部分的代码

# r = []

# for i in test:

# res = pool.apply_async(fun, (i,))

# r.append(res.get())

print(r)

pool.close()

pool.join()

4.查看进程池中进程的进程号

from multiprocessing import Pool

import os

import time

import random

import sys

def worker(msg):

t_start = time.time()

print("%s开始执行,进程号为%d" % (msg, os.getpid()))

# random.random()随机生成0~1之间的浮点数

time.sleep(random.random() * 2)

t_stop = time.time()

print(msg, "执行完毕,耗时%0.2f" % (t_stop - t_start))

return"函数{}-{}-{}".format(sys._getframe().f_code.co_name, str(msg), "over")

ret = list()

po = Pool(3) # 定义一个进程池,最大进程数3

for i in range(0, 10):

# Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))

# 每次循环将会用空闲出来的子进程去调用目标

r = po.apply_async(worker, (i,))

ret.append(r)

print("----start----")

po.close() # 关闭进程池,关闭后po不再接收新的请求

po.join() # 等待po中所有子进程执行完成,必须放在close语句之后

for r in ret:

print(r.get()) # 可以通过返回值对象的 r.get() 方法得到worker函数的返回值

print("-----end-----")

"""执行结果

----start----

0开始执行,进程号为23515

1开始执行,进程号为23516

2开始执行,进程号为23517

1 执行完毕,耗时0.07

3开始执行,进程号为23516

3 执行完毕,耗时0.08

4开始执行,进程号为23516

4 执行完毕,耗时0.66

5开始执行,进程号为23516

2 执行完毕,耗时1.25

6开始执行,进程号为23517

0 执行完毕,耗时1.37

7开始执行,进程号为23515

5 执行完毕,耗时0.83

8开始执行,进程号为23516

8 执行完毕,耗时0.33

9开始执行,进程号为23516

7 执行完毕,耗时0.72

9 执行完毕,耗时0.34

6 执行完毕,耗时1.71

函数worker-0-over

函数worker-1-over

函数worker-2-over

函数worker-3-over

函数worker-4-over

函数worker-5-over

函数worker-6-over

函数worker-7-over

函数worker-8-over

函数worker-9-over

-----end-----

"""

5.进程池实现文件拷贝

import multiprocessing

import os

import time

import random

def copy_file(queue, file_name, source_folder_name, dest_folder_name):

"""copy文件到指定的路径"""

f_read = open(source_folder_name + "/" + file_name, "rb")

f_write = open(dest_folder_name + "/" + file_name, "wb")

while True:

time.sleep(random.random())

content = f_read.read(1024)

if content:

f_write.write(content)

else:

break

f_read.close()

f_write.close()

# 发送已经拷贝完毕的文件名字

queue.put(file_name)

def main():

# 获取要复制的文件夹

source_folder_name = input("请输入要复制文件夹名字:")

# 整理目标文件夹

dest_folder_name = source_folder_name + "[副本]"

# 创建目标文件夹

try:

os.mkdir(dest_folder_name)

except:

pass# 如果文件夹已经存在,那么创建会失败

# 获取这个文件夹中所有的普通文件名

file_names = os.listdir(source_folder_name)

# 创建Queue

queue = multiprocessing.Manager().Queue()

# 创建进程池

pool = multiprocessing.Pool(3)

for file_name in file_names:

# 向进程池中添加任务

pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name))

# 主进程显示进度

pool.close()

all_file_num = len(file_names)

while True:

file_name = queue.get()

if file_name in file_names:

file_names.remove(file_name)

copy_rate = (all_file_num - len(file_names)) * 100 / all_file_num

print("%.2f...(%s)" % (copy_rate, file_name) + "" * 50, end="")

if copy_rate >= 100:

break

print()

if__name__ == "__main__":

main()

6.进程池实现图片之家古装美女图片爬虫

import os

from multiprocessing import Pool

import requests

from bs4 import BeautifulSoup

def get_url(url):

res = requests.get(url)

li = list()

if res.status_code == 200:

# 返回网页源代码

soup = BeautifulSoup(res.text, "html.parser")

# print(soup)

# 返回含有图片url的div标签

re = soup.find("div", class_="list_con_box").find_all("li")

# print(re)

# 数据清洗

for i in re:

img_s = i.find("img") # 返回含有图片url的img标签

if img_s:

src_s = img_s.get("src") # 返回图片url

# print(src_s) # https://img.tupianzj.com/uploads/allimg/200828/30-200RQ110300-L.jpg

li.append(src_s)

return li

def get_img(url):

s = url.split("/")[-1]

# print(s) # 30-200RQ110300-L.jpg

r = requests.get(url)

if r.status_code == 200:

with open("./古装美女/" + s, "wb") as f:

f.write(r.content)

def main():

headers = {

"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:75.0) Gecko/20100101 Firefox/75.0"

}

# 图片之家古装美女url

url = "https://www.tupianzj.com/meinv/guzhuang/"

# 获取图片url列表

img_url = get_url(url)

# 提取图片名称并保存到本地

ifnot os.path.isdir("./古装美女"):

os.mkdir("./古装美女")

# 用进程池实现多任务下载

pool = Pool(7)

pool.map(get_img, img_url) # 效果等同于以下注释的两行代码

# for i in img_url:

# pool.apply_async(get_img, (i,))

pool.close()

pool.join()

if__name__ == "__main__":

main()

7.进程池中主进程调用回调函数

from multiprocessing import Pool

import requests

import os

def func(url):

res = requests.get(url)

print("子进程的pid:%s,父进程的pid:%s"%(os.getpid(),os.getppid()))

# print(res.text)

if res.status_code == 200:

return url,res.text

def cal_back(sta):

url, text = sta

print("回调函数的pid", os.getpid())

with open("a.txt", "a", encoding="utf-8") as f:

f.write(url + text)

# print("回调函数中!", url)

if__name__ == "__main__":

p = Pool(5)

l = ["https://www.baidu.com",

"http://www.jd.com",

"http://www.taobao.com",

"http://www.mi.com",

"http://www.cnblogs.com",

"https://www.bilibili.com",

]

print("主进程的pid", os.getpid())

for i in l:

p.apply_async(func, args=(i,), callback=cal_back)

# 异步执行任务func,每有一个进程执行完任务后,在func中return一个结果,结果会自动的被callback指定的函数,当成形式参数来接收到

p.close()

p.join()

以上是 04_进程池 的全部内容, 来源链接: utcz.com/z/529971.html

回到顶部