python 进程

- 正在执行中的程序称为进程。进程的执行会占用内存等资源。多个进程同时执行时,每个进程的执行都需要由操作系统按一定的算法(RR调度、优先数调度算法等)分配内存空间 
创建一个进程第一种方式
python"># process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。from multiprocessing import Process
def func(n):
    # 子进程函数
    # 获取当前进程id
    print("当前进程id:", os.getpid())
    # 获取父进程id
    print("父进程id:", os.getppid())
    for i in range(10):
        time.sleep(2)
        print("子进程", n)
# 子进程中的程序相当于import的主进程中的程序,那么import的时候会不会执行你import的那个文件的程序啊,前面学的,是会执行的,所以出现了两次打印
print("-----------------")
# Windows下写代码开启子进程时,必须写上if __name__ == ‘__main__’:
if __name__ == "__main__":
    # 首先我运行当前这个test.py文件,运行这个文件的程序,那么就产生了进程,这个进程我们称为主进程
    # 将函数注册到一个进程中,此时还没有启动进程,只是创建了一个进程对象。并且func是不加括号的。
    p = Process(target=func, args=("传参",))  # args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    # 告诉操作系统,给我开启一个进程,func这个函数就被我们新开的这个进程执行了,
    # 而这个进程是我主进程运行过程中创建出来的,所以称这个新创建的进程为主进程的子进程,而主进程又可以称为这个新进程的父进程。
    p.start()  # start并不是直接就去执行了,我们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,并且在这个三个状态之间不断的转换,等待cpu执行时间片到了。
	p.json() # 等待子进程执行结束 主进程 才能继续往下执行
    # 获取当前进程id
    print("当前进程id:", os.getpid())
    # 获取父进程id  Pycharm 进程的ID
    print("父进程id Pycharm 进程的ID:", os.getppid())
    # 这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为异步
    for i in range(10):
        time.sleep(1)
        print("父进程")
process 类中的 参数
group   # 未使用 值始终为 Nonetarget  # 表示调用对象,即子进程要执行的任务
args   #  表示调用对象的 参数元祖  args=(..., )
kwargs  # 表示调用对象的字典 kwargs={"name":'张',}
name  # 为子进程的名字 定义子进程的名字
进程对象的 方法
from multiprocessing import Processp = Process(target=func,)
p.start()  # 启动进程  
p.json()   # 等待子进程执行结束 主进程 才能继续往下执行
p.Terminate()  # 关闭进程 不是自己关闭 而是 给操作系统 发送了一个关闭进程的信号
p.is_alive()  # 查看进程是否还活着
p.daemon = True  # 设置进程为守护进程 写在 start()之前 子进程会跟父进程一起结束
p.name  # 进程的名字
p.pid  # 进程的 id
创建进行的第二种方式:
- 自己定义一个类,继承Process类,必须写一个run方法,想传参数,自行写init方法,然后执行super父类的init方法
from multiprocessing import Processimport os
# 自定义类名  继承Process
class Myprocess(Process):
    def __init__(self, name):
        # 调用父类的__init__()方法
        super().__init__()
        self.name = name
    # 进程类 必须重写run方法
    def run(self):
        print("我是子进程的 id", os.getpid())
        print(self.name)  
# W 下必须写 __name__ == "__main__"
if __name__ == "__main__":
    # 实现 多进程类对象
    p = Myprocess("张飞")
    # 开启进程
    #给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法
    p.start()
	print('p1.name',p1.name)
	print('p1.pid',p1.pid)
		print('主进程结束')
print("我是父进程", os.getpid())
验证进程间的空间隔离
import timefrom multiprocessing import Process
# 进程之间是空间隔离的,不共享资源
global_num = 100
def func1():
    global global_num
    global_num = 0
    print('子进程全局变量>>>', global_num)
if __name__ == '__main__':
    p1 = Process(target=func1, )
    p1.start()
    time.sleep(1)
    print('主进程的全局变量>>>', global_num)
僵和孤儿进程
- 进程结束后资源回收 主进程不会管子进程 自己结束
- 使用 p.json() 主进程会等待子进程结束后 才结束
import timeimport os
from multiprocessing import Process
def func1():
    time.sleep(30)
    print(os.getpid())
    print('子进程')
if __name__ == '__main__':
    p1 = Process(target=func1,)
    p1.start()
    # p1.join()
    # time.sleep(2)
    # print(p1.pid)
    print('主进程的ID',os.getpid())
    print('主进程结束')
同步锁 Lock
- 同步效率低,但是保证了数据安全 重点 
- 抢票案例 - import random- import json - import time - from multiprocessing import Process, Lock - def quang(i, lock): - print("等待抢票") - time.sleep(1) - lock.acquire() # 锁头 只有一把 - with open("aaa", "r") as f: - dic = json.load(f) - if dic["piao"] > 0: - time.sleep(random.random()) - dic["piao"] -= 1 - with open("aaa", "w") as f1: - json.dump(dic, f1) - print("%s强盗了" % i) - else: - print("%s没票了" % i) - lock.release() # 还锁 - if __name__ == "__main__": - lo = Lock() - for i in range(10): - p = Process(target=quang, args=(i, lo)) - p.start() 
信号量 Semaphore
- 阿斯蒂芬 - import random- import time - from multiprocessing import Process, Semaphore - def dbj(i, s): - # 信息 入口 - s.acquire() - print("%s号顾客来洗脚了" % i) - time.sleep(random.randrange(2, 7)) - # 信息 出口 - s.release() - if __name__ == "__main__": - # 信息 - # 创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待. - s = Semaphore(5) - for i in range(20): - p = Process(target=dbj, args=(i, s)) - p.start() 
事件 Event
- 通过事件来完成红路灯 - import random- import time - from multiprocessing import Process, Event - def hld(e): - while 1: - print("红灯了!!") - time.sleep(3) - # 将事件状态改为 True - e.set() - print("绿灯了") - time.sleep(5) - # 将事件状态改为 False - e.clear() - def car(i, e): - # e.is_set() 查看事件状态 True False - if e.is_set(): - print("%s号车直接通过" % i) - else: - print("%s号车等红灯" % i) - # 等待 如果 状态为 True 时 向后执行 - e.wait() - print("%s号车绿灯通过" % i) - if __name__ == "__main__": - e = Event() - p1 = Process(target=hld, args=(e,)) - p1.start() - for i in range(1000): - time.sleep(random.random()) - p = Process(target=car, args=(i, e)) - p.start() - # 方法 - e.is_set() # 查看事件状态(通过改变事件状态来控制事件其他进程的运行) - e.set() # 将事件 改为 True - e.clear() # 将事件改为 False - e.wait() # 等待 如果 状态为 True 时 向后执行 
进程间通信IPC
- 水电费 - import time- from multiprocessing import Process,Queue - def girl(q): - print('来自boy的信息',q.get()) - print('来自校领导的凝视',q.get()) - def boy(q): - q.put('约吗') - if __name__ == '__main__': - q = Queue(5) - boy_p = Process(target=boy,args=(q,)) - girl_p = Process(target=girl,args=(q,)) - boy_p.start() - girl_p.start() - time.sleep(1) - q.put('好好工作,别乱搞') 
队列 Queue #重点
- 先进先出- import random- import time - from multiprocessing import Process, Event, Queue - q = Queue(3) - q.put(1) # 将对象放入队列中 会有细微的 延迟 - q.put(2) - print("队列是否已经满了", q.full()) # 队列是否已经满了 - q.put(3) - print("队列是否已经满了", q.full()) - # q.put(3) - print(q.get()) # 取数据 - print("队列是否空了", q.empty()) # 队列是否空了 - print(q.get()) - print(q.get()) - print("队列是否空了", q.empty()) - print(q.get()) - print(q.get(False)) # 判断队列是否已经空了 空了就报错 queue.Empty - q.qsize() # 获取队列当前大小 就是已存在的数据个数 不可靠 - while 1: - try: - print(q.get(False)) - except: - print("11111") - break - def boy(q): - q.put("约吗") - def girl(q): - while 1: - try: - print(q.get(False)) # == q.get_nowait() 如果队列空则报错 - except: - pass - if __name__ == "__main__": - q = Queue(5) - boy = Process(target=boy, args=(q,)) - girl = Process(target=girl, args=(q,)) - boy.start() - girl.start() - time.sleep(1) - q.put("好好学习") 
生产者消费者模型
- 解耦 缓冲 降低生产者与消费者之间的 耦合性 - import time - from multiprocessing import Process, Queue - def producer(p): - for i in range(1, 11): - time.sleep(1) - print("生产了包子%s" % i) - # 将生产的包子添加到队列中 - p.put("包子%s" % i) - # 生产结束后在队列末尾 添加一个空信号 - p.put(None) - def consumer(p,i): - while 1: - time.sleep(1.5) - # 循环 取出所有元素 - pp = p.get() - if pp: - print("%s吃了" % i, pp) - else: - print("%s吃完了" % i) - # 将空信息还回去 - p.put(None) - break - if __name__ == "__main__": - q = Queue(10) - # 创建 - pro_p = Process(target=producer, args=(q,)) - pro_p.start() - for i in range(2): - con_p = Process(target=consumer, args=(q,i)) - con_p.start() - # p2.join() - # p.put(None) 
JoinableQueue
- JoinableQueue的生产者消费者模型 
import timefrom multiprocessing import Process, JoinableQueue
# 生产者
def producer(q):
    for i in range(10):
        time.sleep(0.5)
        # 创建10 个包子装进队列中
        q.put("包子%s号" % i)
        print("生产了 包子%s" % i)
    # 等待队列中所有内容被取走后 关闭本进程
    q.join()
    print("包子卖完了")
# 消费者
def consumer(q):
    while 1:
        time.sleep(1)
        # 循环吃包子
        print("吃了 %s" % q.get())
        # 每取 一个元素 就给 q.join 传递一个信号 记录取出个数
        q.task_done() 
if __name__ == "__main__":
    # 实现 JoinableQueue 队列 对象
    q = JoinableQueue(20)
    # 将 生产者加入进程
    pro_p = Process(target=producer, args=(q,))
    pro_p.start()
    # 将 消费者 加入进程
    con_p = Process(target=consumer, args=(q,))
    # 将消费者设置成守护进程  随 住程序一起关闭
    con_p.daemon = True
    con_p.start()
    # 等待进程 执行完闭 主程序才能关闭
    pro_p.join()
    print("关闭程序!")
管道 pipe
- 管道是不安全的 
from multiprocessing import Process, Pipe, Manager, Lock, Pooldef func(conn2):
    try:
        print(conn2.recv())
        print(conn2.recv())
    except EOFError:
        print("管道已经关闭了")
        conn2.close()
if __name__ == '__main__':
    try:
        conn1, conn2 = Pipe()  # 在创建 Process 对象之前创建管道
        p = Process(target=func, args=(conn2,))
        p.start()
        conn1.send("asdad")
        conn1.close()
        conn1.recv()
        p.join()
    except OSError:
        print("管道关闭>>>>>>>>>>")
# 方法 recv() 接收  send() 发送
#- 管道默认是双工的  
#  设置为 单工  参数:  duplex=False  改为单工  conn1 发送  conn2 接收  
如果另一端已经关闭 则 recv() 接收会报错
数据共享 Manager
- 多进程同时操作一个文件的数据 不加锁就会出现错误数据 
- 共享: 可以将一个数据传递到进程中 在不同的 作用于中 进程可对其进行修改 - import time, os- from multiprocessing import Process, Manager, Lock - '''资源共享''' - def func(mm): - mm["name"] = "张飞" - if __name__ == '__main__': - m = Manager() - mm = m.dict({"name": "aaaa"}) - print(mm) - p = Process(target=func, args=(mm,)) - p.start() - p.join() - print(mm) - def func(m_d, ml): - with ml: - m_d["count"] -= 1 - if __name__ == '__main__': - m = Manager() - ml = Lock() - m_d = m.dict({"count": 100}) - lis = [] - for i in range(20): - p = Process(target=func, args=(m_d, ml)) - p.start() - lis.append(p) - [i.join() for i in lis] - print(m_d) 
进程池 Pool
import timefrom multiprocessing import Process,Pool
def func(n):
    for i in range(5):
        n = n + i
    print(n)
if __name__ == '__main__':
    #验证一下传参
    pool = Pool(4)
    pool.map(func,range(100))  #map自带join功能,异步执行任务,参数是可迭代的
    p_list = []
    for i in range(200):
        p1 = Process(target=func,args=(i,))
        p1.start()
        p_list.append(p1)
    [p.join() for p in p_list]
def func(n):
    print(n)
    time.sleep(1)
    return n * n
if __name__ == '__main__':
    pool = Pool(4)  # 进程池 的个数
    lis = []
    for i in range(10):
        # res = p.apply(fun,args=(i,))  #同步执行的方法,他会等待你的任务的返回结果,
        # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了。
        # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
        # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
        # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。
        ret = pool.apply_async(func, args=(i,))  # 异步执行的方法,他会等待你的任务的返回结果,
        # print(ret.get())
        lis.append(ret)
    # print(lis)
    pool.close()  # 不是关闭进程池,而是不允许再有其他任务来使用进程池
    pool.join()  # 这是感知进程池中任务的方法,等待进程池的任务全部执行完
	pool.ready()  # 如果调用完成 返回 True
    pool.terminate()  # 立即终止所有进程
    for i in lis:
        print(i.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
    print("主程序结束")
map 传参
import timefrom multiprocessing import Process,Pool
def func(n):
    print(n)
if __name__ == '__main__':
    pool = Pool(4)
    # pool.map(func,range(100)) #参数是可迭代的
    pool.map(func,['sb',(1,2)]) #参数是可迭代的
    # pool.map(func,range(100))  #map自带join功能,异步执行任务,参数是可迭代的
回调函数 callback
- 回调函数的形参只能有一个 如果执行函数有多个返回值 那么 回调函数 接收的是一个元祖 包含所有执行函数的返回值 - import time, os- from multiprocessing import Process, Pool - def func1(n): - print(os.getpid()) - n += 10 - return n - def func2(nn): - # 回调函数 接收 func1 的返回值 - print(os.getpid()) - print(nn) - return 10 - if __name__ == '__main__': - pool = Pool() - print(os.getpid()) - lis = [] - c = pool.apply_async(func1, args=(1,), callback=func2) # 将func1 反回的结果交给func2 执行 - pool.close() # 不是关闭进程池,而是不允许再有其他任务来使用进程池 - pool.join() # 这是感知进程池中任务的方法,等待进程池的任务全部执行完 
多进程爬虫
from multiprocessing import Process, Poolfrom urllib.request import urlopen
import ssl, re
# ⼲掉数字签名证书
ssl._create_default_https_context = ssl._create_unverified_context
urls = [
    'https://www.baidu.com',
    'https://www.python.org',
    'https://www.openstack.org',
    'https://help.github.com/',
    'http://www.sina.com.cn/'
]
def func1(path):
    # 打开网址
    condent = urlopen(path)
    # 返回网页源代码
    return condent.read().decode("utf-8")
def func2(content):
    com = re.compile(r"<a(?P<aaa>.*?)</a>")
    cont = re.findall(com, content)
    print(cont)
if __name__ == '__main__':
    pool = Pool()
    for path in urls:
        tar = pool.apply_async(func1, args=(path,), callback=func2)
    pool.close()
    pool.join()
以上是 python 进程 的全部内容, 来源链接: utcz.com/z/387916.html







