04_进程池
1.为什么用进程池
1.在需要频繁的创建删除较多进程的情况下,导致计算机资源消耗过多
2.进程池则是创建指定进程数量等待执行事件,避免了不必要的创建和销毁过程
2.进程池的使用步骤
1.创建进程池,在池内放入适量的进程,将事件加入进程池的等待队列
2.使用进程池中的进程不断处理事件,所有事件处理后回收关闭进程池
3.语法概述
from multiprocessing import Poolpool
= Pool(processes=4) # 创建指定进程数量进程池并返回进程池对象# 异步方式将事件放入进程池执行,返回一个对象该对象
# callback: 回调函数,每当进程池中的进程处理完任务了,返回的结果交给回调函数,由回调函数进一步处理,回调函数只有异步时才有
ret = pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
参数:
func要执行的事件函数,可以通过返回值对象的 ret.get() 方法得到func函数的返回值
args: 位置参数元组,要给函数传递的参数
kwargs: 键值对参数字典,要给函数传递的参数
callback: 进程的任务函数返回值被当做回调函数的形参接收到,以此进行进一步的处理操作,回调函数是主进程调用的
error_callback: 错误时主进程执行的回调函数
pool.close() # 关闭进程池,使其无法加入新的事件
pool.join() # 阻塞等待进程池退出(当所有事情处理完毕后)
pool.apply() # 用法和 pool.apply_async() 一样,但是没有返回值,异步方式将事件放入进程池顺序执行,一个事件结束再执行另一个事件
func_list = pool.map(func, iter) # 类似于内建函数map,将第二个参数的迭代数传递给第一个参数的函数执行,同时兼容了使用进程池执行
# 示例:
from multiprocessing import Pool
import time
def fun(num):
time.sleep(1)
return num * num
test = [1, 2, 3, 4, 5, 6]
pool = Pool(3)
r = pool.map(fun, test) # 返回fun函数的返回值列表
# 上面折行代码等同于以下注释部分的代码
# r = []
# for i in test:
# res = pool.apply_async(fun, (i,))
# r.append(res.get())
print(r)
pool.close()
pool.join()
4.查看进程池中进程的进程号
from multiprocessing import Poolimport osimport timeimport randomimport sysdef worker(msg):t_start
= time.time()print("%s开始执行,进程号为%d" % (msg, os.getpid()))# random.random()随机生成0~1之间的浮点数time.sleep(random.random() * 2)
t_stop = time.time()
print(msg, "执行完毕,耗时%0.2f" % (t_stop - t_start))
return"函数{}-{}-{}".format(sys._getframe().f_code.co_name, str(msg), "over")
ret = list()
po = Pool(3) # 定义一个进程池,最大进程数3
for i in range(0, 10):
# Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标
r = po.apply_async(worker, (i,))
ret.append(r)
print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
for r in ret:
print(r.get()) # 可以通过返回值对象的 r.get() 方法得到worker函数的返回值
print("-----end-----")
"""执行结果
----start----
0开始执行,进程号为23515
1开始执行,进程号为23516
2开始执行,进程号为23517
1 执行完毕,耗时0.07
3开始执行,进程号为23516
3 执行完毕,耗时0.08
4开始执行,进程号为23516
4 执行完毕,耗时0.66
5开始执行,进程号为23516
2 执行完毕,耗时1.25
6开始执行,进程号为23517
0 执行完毕,耗时1.37
7开始执行,进程号为23515
5 执行完毕,耗时0.83
8开始执行,进程号为23516
8 执行完毕,耗时0.33
9开始执行,进程号为23516
7 执行完毕,耗时0.72
9 执行完毕,耗时0.34
6 执行完毕,耗时1.71
函数worker-0-over
函数worker-1-over
函数worker-2-over
函数worker-3-over
函数worker-4-over
函数worker-5-over
函数worker-6-over
函数worker-7-over
函数worker-8-over
函数worker-9-over
-----end-----
"""
5.进程池实现文件拷贝
import multiprocessingimport osimport timeimport randomdef copy_file(queue, file_name, source_folder_name, dest_folder_name):"""copy文件到指定的路径"""f_read
= open(source_folder_name + "/" + file_name, "rb")f_write
= open(dest_folder_name + "/" + file_name, "wb")while True:time.sleep(random.random())
content
= f_read.read(1024)if content:f_write.write(content)
else:breakf_read.close()
f_write.close()
# 发送已经拷贝完毕的文件名字queue.put(file_name)
def main():
# 获取要复制的文件夹
source_folder_name = input("请输入要复制文件夹名字:")
# 整理目标文件夹
dest_folder_name = source_folder_name + "[副本]"
# 创建目标文件夹
try:
os.mkdir(dest_folder_name)
except:
pass# 如果文件夹已经存在,那么创建会失败
# 获取这个文件夹中所有的普通文件名
file_names = os.listdir(source_folder_name)
# 创建Queue
queue = multiprocessing.Manager().Queue()
# 创建进程池
pool = multiprocessing.Pool(3)
for file_name in file_names:
# 向进程池中添加任务
pool.apply_async(copy_file, args=(queue, file_name, source_folder_name, dest_folder_name))
# 主进程显示进度
pool.close()
all_file_num = len(file_names)
while True:
file_name = queue.get()
if file_name in file_names:
file_names.remove(file_name)
copy_rate = (all_file_num - len(file_names)) * 100 / all_file_num
print("%.2f...(%s)" % (copy_rate, file_name) + "" * 50, end="")
if copy_rate >= 100:
break
print()
if__name__ == "__main__":
main()
6.进程池实现图片之家古装美女图片爬虫
import osfrom multiprocessing import Poolimport requestsfrom bs4 import BeautifulSoupdef get_url(url):res
= requests.get(url)li
= list()if res.status_code == 200:# 返回网页源代码soup = BeautifulSoup(res.text, "html.parser")
# print(soup)
# 返回含有图片url的div标签
re = soup.find("div", class_="list_con_box").find_all("li")
# print(re)
# 数据清洗
for i in re:
img_s = i.find("img") # 返回含有图片url的img标签
if img_s:
src_s = img_s.get("src") # 返回图片url
# print(src_s) # https://img.tupianzj.com/uploads/allimg/200828/30-200RQ110300-L.jpg
li.append(src_s)
return li
def get_img(url):
s = url.split("/")[-1]
# print(s) # 30-200RQ110300-L.jpg
r = requests.get(url)
if r.status_code == 200:
with open("./古装美女/" + s, "wb") as f:
f.write(r.content)
def main():
headers = {
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:75.0) Gecko/20100101 Firefox/75.0"
}
# 图片之家古装美女url
url = "https://www.tupianzj.com/meinv/guzhuang/"
# 获取图片url列表
img_url = get_url(url)
# 提取图片名称并保存到本地
ifnot os.path.isdir("./古装美女"):
os.mkdir("./古装美女")
# 用进程池实现多任务下载
pool = Pool(7)
pool.map(get_img, img_url) # 效果等同于以下注释的两行代码
# for i in img_url:
# pool.apply_async(get_img, (i,))
pool.close()
pool.join()
if__name__ == "__main__":
main()
7.进程池中主进程调用回调函数
from multiprocessing import Poolimport requestsimport osdef func(url):res
= requests.get(url)print("子进程的pid:%s,父进程的pid:%s"%(os.getpid(),os.getppid()))# print(res.text)if res.status_code == 200:
return url,res.text
def cal_back(sta):
url, text = sta
print("回调函数的pid", os.getpid())
with open("a.txt", "a", encoding="utf-8") as f:
f.write(url + text)
# print("回调函数中!", url)
if__name__ == "__main__":
p = Pool(5)
l = ["https://www.baidu.com",
"http://www.jd.com",
"http://www.taobao.com",
"http://www.mi.com",
"http://www.cnblogs.com",
"https://www.bilibili.com",
]
print("主进程的pid", os.getpid())
for i in l:
p.apply_async(func, args=(i,), callback=cal_back)
# 异步执行任务func,每有一个进程执行完任务后,在func中return一个结果,结果会自动的被callback指定的函数,当成形式参数来接收到
p.close()
p.join()
以上是 04_进程池 的全部内容, 来源链接: utcz.com/z/529971.html