python 进程

python

  • 正在执行中的程序称为进程。进程的执行会占用内存等资源。多个进程同时执行时,每个进程的执行都需要由操作系统按一定的算法(RR调度、优先数调度算法等)分配内存空间

创建一个进程第一种方式

python"># process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

from multiprocessing import Process

def func(n):

# 子进程函数

# 获取当前进程id

print("当前进程id:", os.getpid())

# 获取父进程id

print("父进程id:", os.getppid())

for i in range(10):

time.sleep(2)

print("子进程", n)

# 子进程中的程序相当于import的主进程中的程序,那么import的时候会不会执行你import的那个文件的程序啊,前面学的,是会执行的,所以出现了两次打印

print("-----------------")

# Windows下写代码开启子进程时,必须写上if __name__ == ‘__main__’:

if __name__ == "__main__":

# 首先我运行当前这个test.py文件,运行这个文件的程序,那么就产生了进程,这个进程我们称为主进程

# 将函数注册到一个进程中,此时还没有启动进程,只是创建了一个进程对象。并且func是不加括号的。

p = Process(target=func, args=("传参",)) # args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

# 告诉操作系统,给我开启一个进程,func这个函数就被我们新开的这个进程执行了,

# 而这个进程是我主进程运行过程中创建出来的,所以称这个新创建的进程为主进程的子进程,而主进程又可以称为这个新进程的父进程。

p.start() # start并不是直接就去执行了,我们知道进程有三个状态,进程会进入进程的三个状态,就绪,(被调度,也就是时间片切换到它的时候)执行,阻塞,并且在这个三个状态之间不断的转换,等待cpu执行时间片到了。

p.json() # 等待子进程执行结束 主进程 才能继续往下执行

# 获取当前进程id

print("当前进程id:", os.getpid())

# 获取父进程id Pycharm 进程的ID

print("父进程id Pycharm 进程的ID:", os.getppid())

# 这是主进程的程序,上面开启的子进程的程序是和主进程的程序同时运行的,我们称为异步

for i in range(10):

time.sleep(1)

print("父进程")

process 类中的 参数

group   # 未使用 值始终为 None

target # 表示调用对象,即子进程要执行的任务

args # 表示调用对象的 参数元祖 args=(..., )

kwargs # 表示调用对象的字典 kwargs={"name":'张',}

name # 为子进程的名字 定义子进程的名字

进程对象的 方法

from multiprocessing import Process

p = Process(target=func,)

p.start() # 启动进程

p.json() # 等待子进程执行结束 主进程 才能继续往下执行

p.Terminate() # 关闭进程 不是自己关闭 而是 给操作系统 发送了一个关闭进程的信号

p.is_alive() # 查看进程是否还活着

p.daemon = True # 设置进程为守护进程 写在 start()之前 子进程会跟父进程一起结束

p.name # 进程的名字

p.pid # 进程的 id

创建进行的第二种方式:

  • 自己定义一个类,继承Process类,必须写一个run方法,想传参数,自行写init方法,然后执行super父类的init方法

from multiprocessing import Process

import os

# 自定义类名 继承Process

class Myprocess(Process):

def __init__(self, name):

# 调用父类的__init__()方法

super().__init__()

self.name = name

# 进程类 必须重写run方法

def run(self):

print("我是子进程的 id", os.getpid())

print(self.name)

# W 下必须写 __name__ == "__main__"

if __name__ == "__main__":

# 实现 多进程类对象

p = Myprocess("张飞")

# 开启进程

#给操作系统发送创建进程的指令,子进程创建好之后,要被执行,执行的时候就会执行run方法

p.start()

print('p1.name',p1.name)

print('p1.pid',p1.pid)

print('主进程结束')

print("我是父进程", os.getpid())

验证进程间的空间隔离

import time

from multiprocessing import Process

# 进程之间是空间隔离的,不共享资源

global_num = 100

def func1():

global global_num

global_num = 0

print('子进程全局变量>>>', global_num)

if __name__ == '__main__':

p1 = Process(target=func1, )

p1.start()

time.sleep(1)

print('主进程的全局变量>>>', global_num)

僵和孤儿进程

  • 进程结束后资源回收 主进程不会管子进程 自己结束
  • 使用 p.json() 主进程会等待子进程结束后 才结束

import time

import os

from multiprocessing import Process

def func1():

time.sleep(30)

print(os.getpid())

print('子进程')

if __name__ == '__main__':

p1 = Process(target=func1,)

p1.start()

# p1.join()

# time.sleep(2)

# print(p1.pid)

print('主进程的ID',os.getpid())

print('主进程结束')

同步锁 Lock

  • 同步效率低,但是保证了数据安全 重点

  • 抢票案例

    import random

    import json

    import time

    from multiprocessing import Process, Lock

    def quang(i, lock):

    print("等待抢票")

    time.sleep(1)

    lock.acquire() # 锁头 只有一把

    with open("aaa", "r") as f:

    dic = json.load(f)

    if dic["piao"] > 0:

    time.sleep(random.random())

    dic["piao"] -= 1

    with open("aaa", "w") as f1:

    json.dump(dic, f1)

    print("%s强盗了" % i)

    else:

    print("%s没票了" % i)

    lock.release() # 还锁

    if __name__ == "__main__":

    lo = Lock()

    for i in range(10):

    p = Process(target=quang, args=(i, lo))

    p.start()

信号量 Semaphore

  • 阿斯蒂芬

    import random

    import time

    from multiprocessing import Process, Semaphore

    def dbj(i, s):

    # 信息 入口

    s.acquire()

    print("%s号顾客来洗脚了" % i)

    time.sleep(random.randrange(2, 7))

    # 信息 出口

    s.release()

    if __name__ == "__main__":

    # 信息

    # 创建一个计数器,每次acquire就减1,直到减到0,那么上面的任务只有4个在同时异步的执行,后面的进程需要等待.

    s = Semaphore(5)

    for i in range(20):

    p = Process(target=dbj, args=(i, s))

    p.start()

事件 Event

  • 通过事件来完成红路灯

    import random

    import time

    from multiprocessing import Process, Event

    def hld(e):

    while 1:

    print("红灯了!!")

    time.sleep(3)

    # 将事件状态改为 True

    e.set()

    print("绿灯了")

    time.sleep(5)

    # 将事件状态改为 False

    e.clear()

    def car(i, e):

    # e.is_set() 查看事件状态 True False

    if e.is_set():

    print("%s号车直接通过" % i)

    else:

    print("%s号车等红灯" % i)

    # 等待 如果 状态为 True 时 向后执行

    e.wait()

    print("%s号车绿灯通过" % i)

    if __name__ == "__main__":

    e = Event()

    p1 = Process(target=hld, args=(e,))

    p1.start()

    for i in range(1000):

    time.sleep(random.random())

    p = Process(target=car, args=(i, e))

    p.start()

    # 方法

    e.is_set() # 查看事件状态(通过改变事件状态来控制事件其他进程的运行)

    e.set() # 将事件 改为 True

    e.clear() # 将事件改为 False

    e.wait() # 等待 如果 状态为 True 时 向后执行

进程间通信IPC

  • 水电费

    import time

    from multiprocessing import Process,Queue

    def girl(q):

    print('来自boy的信息',q.get())

    print('来自校领导的凝视',q.get())

    def boy(q):

    q.put('约吗')

    if __name__ == '__main__':

    q = Queue(5)

    boy_p = Process(target=boy,args=(q,))

    girl_p = Process(target=girl,args=(q,))

    boy_p.start()

    girl_p.start()

    time.sleep(1)

    q.put('好好工作,别乱搞')

队列 Queue #重点

  • 先进先出

    import random

    import time

    from multiprocessing import Process, Event, Queue

    q = Queue(3)

    q.put(1) # 将对象放入队列中 会有细微的 延迟

    q.put(2)

    print("队列是否已经满了", q.full()) # 队列是否已经满了

    q.put(3)

    print("队列是否已经满了", q.full())

    # q.put(3)

    print(q.get()) # 取数据

    print("队列是否空了", q.empty()) # 队列是否空了

    print(q.get())

    print(q.get())

    print("队列是否空了", q.empty())

    print(q.get())

    print(q.get(False)) # 判断队列是否已经空了 空了就报错 queue.Empty

    q.qsize() # 获取队列当前大小 就是已存在的数据个数 不可靠

    while 1:

    try:

    print(q.get(False))

    except:

    print("11111")

    break

    def boy(q):

    q.put("约吗")

    def girl(q):

    while 1:

    try:

    print(q.get(False)) # == q.get_nowait() 如果队列空则报错

    except:

    pass

    if __name__ == "__main__":

    q = Queue(5)

    boy = Process(target=boy, args=(q,))

    girl = Process(target=girl, args=(q,))

    boy.start()

    girl.start()

    time.sleep(1)

    q.put("好好学习")

生产者消费者模型

  • 解耦 缓冲 降低生产者与消费者之间的 耦合性

    import time

    from multiprocessing import Process, Queue

    def producer(p):

    for i in range(1, 11):

    time.sleep(1)

    print("生产了包子%s" % i)

    # 将生产的包子添加到队列中

    p.put("包子%s" % i)

    # 生产结束后在队列末尾 添加一个空信号

    p.put(None)

    def consumer(p,i):

    while 1:

    time.sleep(1.5)

    # 循环 取出所有元素

    pp = p.get()

    if pp:

    print("%s吃了" % i, pp)

    else:

    print("%s吃完了" % i)

    # 将空信息还回去

    p.put(None)

    break

    if __name__ == "__main__":

    q = Queue(10)

    # 创建

    pro_p = Process(target=producer, args=(q,))

    pro_p.start()

    for i in range(2):

    con_p = Process(target=consumer, args=(q,i))

    con_p.start()

    # p2.join()

    # p.put(None)

JoinableQueue

  • JoinableQueue的生产者消费者模型

import time

from multiprocessing import Process, JoinableQueue

# 生产者

def producer(q):

for i in range(10):

time.sleep(0.5)

# 创建10 个包子装进队列中

q.put("包子%s号" % i)

print("生产了 包子%s" % i)

# 等待队列中所有内容被取走后 关闭本进程

q.join()

print("包子卖完了")

# 消费者

def consumer(q):

while 1:

time.sleep(1)

# 循环吃包子

print("吃了 %s" % q.get())

# 每取 一个元素 就给 q.join 传递一个信号 记录取出个数

q.task_done()

if __name__ == "__main__":

# 实现 JoinableQueue 队列 对象

q = JoinableQueue(20)

# 将 生产者加入进程

pro_p = Process(target=producer, args=(q,))

pro_p.start()

# 将 消费者 加入进程

con_p = Process(target=consumer, args=(q,))

# 将消费者设置成守护进程 随 住程序一起关闭

con_p.daemon = True

con_p.start()

# 等待进程 执行完闭 主程序才能关闭

pro_p.join()

print("关闭程序!")

管道 pipe

  • 管道是不安全的

from multiprocessing import Process, Pipe, Manager, Lock, Pool

def func(conn2):

try:

print(conn2.recv())

print(conn2.recv())

except EOFError:

print("管道已经关闭了")

conn2.close()

if __name__ == '__main__':

try:

conn1, conn2 = Pipe() # 在创建 Process 对象之前创建管道

p = Process(target=func, args=(conn2,))

p.start()

conn1.send("asdad")

conn1.close()

conn1.recv()

p.join()

except OSError:

print("管道关闭>>>>>>>>>>")

# 方法 recv() 接收 send() 发送

#- 管道默认是双工的

# 设置为 单工 参数: duplex=False 改为单工 conn1 发送 conn2 接收

如果另一端已经关闭 则 recv() 接收会报错

数据共享 Manager

  • 多进程同时操作一个文件的数据 不加锁就会出现错误数据

  • 共享: 可以将一个数据传递到进程中 在不同的 作用于中 进程可对其进行修改

    import time, os

    from multiprocessing import Process, Manager, Lock

    '''资源共享'''

    def func(mm):

    mm["name"] = "张飞"

    if __name__ == '__main__':

    m = Manager()

    mm = m.dict({"name": "aaaa"})

    print(mm)

    p = Process(target=func, args=(mm,))

    p.start()

    p.join()

    print(mm)

    def func(m_d, ml):

    with ml:

    m_d["count"] -= 1

    if __name__ == '__main__':

    m = Manager()

    ml = Lock()

    m_d = m.dict({"count": 100})

    lis = []

    for i in range(20):

    p = Process(target=func, args=(m_d, ml))

    p.start()

    lis.append(p)

    [i.join() for i in lis]

    print(m_d)

进程池 Pool

import time

from multiprocessing import Process,Pool

def func(n):

for i in range(5):

n = n + i

print(n)

if __name__ == '__main__':

#验证一下传参

pool = Pool(4)

pool.map(func,range(100)) #map自带join功能,异步执行任务,参数是可迭代的

p_list = []

for i in range(200):

p1 = Process(target=func,args=(i,))

p1.start()

p_list.append(p1)

[p.join() for p in p_list]

def func(n):

print(n)

time.sleep(1)

return n * n

if __name__ == '__main__':

pool = Pool(4) # 进程池 的个数

lis = []

for i in range(10):

# res = p.apply(fun,args=(i,)) #同步执行的方法,他会等待你的任务的返回结果,

# 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行,并且可以执行不同的任务,传送任意的参数了。

# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务

# 需要注意的是,进程池中的三个进程不会同时开启或者同时结束

# 而是执行完一个就释放一个进程,这个进程就去接收新的任务。

ret = pool.apply_async(func, args=(i,)) # 异步执行的方法,他会等待你的任务的返回结果,

# print(ret.get())

lis.append(ret)

# print(lis)

pool.close() # 不是关闭进程池,而是不允许再有其他任务来使用进程池

pool.join() # 这是感知进程池中任务的方法,等待进程池的任务全部执行完

pool.ready() # 如果调用完成 返回 True

pool.terminate() # 立即终止所有进程

for i in lis:

print(i.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

print("主程序结束")

map 传参

import time

from multiprocessing import Process,Pool

def func(n):

print(n)

if __name__ == '__main__':

pool = Pool(4)

# pool.map(func,range(100)) #参数是可迭代的

pool.map(func,['sb',(1,2)]) #参数是可迭代的

# pool.map(func,range(100)) #map自带join功能,异步执行任务,参数是可迭代的

回调函数 callback

  • 回调函数的形参只能有一个 如果执行函数有多个返回值 那么 回调函数 接收的是一个元祖 包含所有执行函数的返回值

    import time, os

    from multiprocessing import Process, Pool

    def func1(n):

    print(os.getpid())

    n += 10

    return n

    def func2(nn):

    # 回调函数 接收 func1 的返回值

    print(os.getpid())

    print(nn)

    return 10

    if __name__ == '__main__':

    pool = Pool()

    print(os.getpid())

    lis = []

    c = pool.apply_async(func1, args=(1,), callback=func2) # 将func1 反回的结果交给func2 执行

    pool.close() # 不是关闭进程池,而是不允许再有其他任务来使用进程池

    pool.join() # 这是感知进程池中任务的方法,等待进程池的任务全部执行完

多进程爬虫

from multiprocessing import Process, Pool

from urllib.request import urlopen

import ssl, re

# ⼲掉数字签名证书

ssl._create_default_https_context = ssl._create_unverified_context

urls = [

'https://www.baidu.com',

'https://www.python.org',

'https://www.openstack.org',

'https://help.github.com/',

'http://www.sina.com.cn/'

]

def func1(path):

# 打开网址

condent = urlopen(path)

# 返回网页源代码

return condent.read().decode("utf-8")

def func2(content):

com = re.compile(r"<a(?P<aaa>.*?)</a>")

cont = re.findall(com, content)

print(cont)

if __name__ == '__main__':

pool = Pool()

for path in urls:

tar = pool.apply_async(func1, args=(path,), callback=func2)

pool.close()

pool.join()

以上是 python 进程 的全部内容, 来源链接: utcz.com/z/387916.html

回到顶部