【Python】八.进程和线程

[TOC]

一.进程

1.相关概念

  • 什么是程序?

程序:例如XXXX.py这是程序,处于静态的。

  • 什么是进程

进程:一个程序运行起来后,代码+用到的资源称之为进程,它是操作系统分配资源的基本单元。

在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;
在当代面向线程设计的计算机结构中,进程是线程的容器。

  • 同步/异步

所谓同步就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,这是一种可靠的任务序列。
所谓异步是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。

  • 阻塞/非阻塞

阻塞和非阻塞跟同步和异步无关,主要与程序等待消息通知时的状态有关。也就是说阻塞与非阻塞主要是从程序(线程)等待消息通知时的状态角度来讲的。

  • 并发/并行

1)并行,parallel 强调同一时刻同时执行
2)并发 concurrency 则指的一个时间段内去一起执行

2.进程的状态

【Python】八.进程和线程
就绪态:运行的条件都已经慢去,正在等在cpu执行
执行态:cpu正在执行其功能
等待态:等待某些条件满足,例如一个程序sleep了,此时就处于等待态

3.Python中使用多进程

multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情

  • 示例1

from multiprocessing import Process

import time

def run_process():

while True:

print("子进程----2----")

time.sleep(1)

if __name__=='__main__':

p = Process(target=run_process) # target指定目标函数

p.start()

while True:

print("主进程----1----")

time.sleep(1)

Process语法:
Process([group [, target [, name [, args [, kwargs]]]]])

参数--------------------------
target:如果传递了函数的引用,可以任务这个子进程就执行这里的代码
args:给target指定的函数传递的参数,以元组的方式传递
kwargs:给target指定的函数传递命名参数
name:给进程设定一个名字,可以不设定
group:指定进程组,大多数情况下用不到
Process创建的实例对象的常用方法:

方法--------------------------
start():启动子进程实例(创建子进程)
is_alive():判断进程子进程是否还在活着
join([timeout]):是否等待子进程执行结束,或等待多少秒
terminate():不管任务是否完成,立即终止子进程
Process创建的实例对象的常用属性:

属性-------------------------
name:当前进程的别名,默认为Process-N,N为从1开始递增的整数
pid:当前进程的pid(进程号)

  • 示例2 进程pid

from multiprocessing import Process

import time

import os

def run_process():

while True:

print("子进程----pid:{}----".format(os.getpid()))

print()

time.sleep(1)

if __name__=='__main__':

p = Process(target=run_process)

p.start()

while True:

print("主进程----pid:{}----".format(os.getpid()))

time.sleep(1)

  • 示例3 子进程目标方法传参

from multiprocessing import Process

import time

import os

def run_process(course, teacher, *args, **kwargs):

while True:

print("子进程----pid:{}----{}上{}课".format(os.getpid(), teacher, course))

print()

time.sleep(1)

if __name__=='__main__':

p = Process(target=run_process, args=('语文',), kwargs={'teacher':'张三'})

p.start()

while True:

print("主进程----pid:{}----{}上{}课".format(os.getpid(),'李四','数学'))

time.sleep(1)

  • 示例4 进程间不会共享全局变量

from multiprocessing import Process

import time

import os

num_list = [0 , 1, 3, 4, 5, 6, 7, 8, 9, 10]

i = 3

def run_process1():

global i

while i:

print("子进程----pid:{}----".format(os.getpid()))

num_list.pop()

print(num_list)

i = i - 1

time.sleep(1)

def run_process2():

global i

while i:

print("子进程----pid:{}----".format(os.getpid()))

num_list.append(i+1)

print(num_list)

i = i - 1

time.sleep(1)

if __name__=='__main__':

p = Process(target=run_process1)

p.start()

p = Process(target=run_process2)

p.start()

输出

子进程----pid:10187----

[0, 1, 3, 4, 5, 6, 7, 8, 9]

子进程----pid:10188----

[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4]

子进程----pid:10187----

[0, 1, 3, 4, 5, 6, 7, 8]

子进程----pid:10188----

[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3]

子进程----pid:10187----

[0, 1, 3, 4, 5, 6, 7]

子进程----pid:10188----

[0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 4, 3, 2]

4.进程间通信

可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序。

示例

from multiprocessing import Process, Queue

import time, random

def worker(q):

"""完成文件"""

for i in range(10):

file_num = random.randint(1, 100)

print('已完成工作{}...'.format(file_num))

q.put(file_num)

time.sleep(1)

def boss(q):

"""查看文件"""

while True:

if not q.empty():

file_num = q.get(True)

print('已查看工作{}...'.format(file_num))

time.sleep(1)

else:

break

if __name__=='__main__':

# 创建Queue,并传给各个子进程:

q = Queue(5)

pw = Process(target=worker, args=(q,))

pb = Process(target=boss, args=(q,))

pw.start()

# pw.join()

pb.start()

# pb.join()

5.进程池

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。

示例

from multiprocessing import Pool

import os, time, random

def worker(msg):

t_start = time.time()

print("start pro {},pid为{}".format(msg,os.getpid()))

# random.random()随机生成0~1之间的浮点数

time.sleep(random.random()*2)

t_stop = time.time()

print(msg,"执行完毕,耗时{:.2f}" .format(t_stop-t_start))

# 定义一个进程池,最大进程数3

po = Pool(5)

for i in range(0,10):

# Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))

# 每次循环将会用空闲出来的子进程去调用目标

po.apply_async(worker,(i,))

print("----start----")

time.sleep(10)

po.close() # 关闭进程池,关闭后po不再接收新的请求

po.join() # 等待po中所有子进程执行完成,必须放在close语句之后

print("-----end-----")

6.进程间进程的通信

使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue()

示例

from multiprocessing import Pool, Manager

import time, random

def worker(q):

"""完成文件"""

for i in range(10):

file_num = random.randint(1, 100)

print('已完成工作{}...'.format(file_num))

q.put(file_num)

time.sleep(1)

def boss(q):

"""查看文件"""

while True:

if not q.empty():

file_num = q.get(True)

print('已查看工作{}...'.format(file_num))

time.sleep(1)

else:

break

if __name__=='__main__':

# 创建Queue,并传给各个子进程:

q = Manager().Queue()

po = Pool()

po.apply_async(worker, (q,))

time.sleep(1) # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据

po.apply_async(boss, (q,))

po.close()

po.join()

二.线程

现在操作系统提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。
进程就是独立的王国,进程间不可以随便的共享数据。
线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程都拥有自己独立的堆栈。

线程同样有着类似进程的状态
1)运行态:该时刻,该线程正在占用CPU
2)就绪态:可随时转换为运行态,因为其他线程正在运行而暂停,该进程不占用CPU
3)阻塞态:除非某些外部事件发生,否则线程不能运行

Python线程的操作可以使用threading模块,threading模块是对更底层thread做了一些包装的,可以更加方便的被使用。

1.threading.Thread

Thread类:
def __init__(self, group=None, target=Nonoe, name=None, args=(), kwargs=None, daemon=None)

target 线程调用的对象,就是目标函数
name 为线程起个名字(可以重名,因为线程区分靠ID,不靠名字)
args,为目标函数传递实参,元组
kwargs, 为目标函数关键字传参,字典

threading的属性和方法
current_thread() 返回当前线程对象
main_thread() 返回主线程对象
active_count() 当前处于alive状态的线程个数
enumerate() 返回所有活着的线程的列表,不包括已经终止的和未开始的线程
get_ident() 返回当前线程的ID,非0整数

Thread实例的属性和方法
name 只是一个名字
ident 线程ID
is_alive() 返回线程是否活着

start() 启动线程,每一个线程必须且只能执行该方法一次
run() 运行线程函数

  • 示例1 线程的启动

import threading

import time

def worker():

for _ in range(10):

time.sleep(0.5)

print('start')

print(threading.get_ident()) # 返回当前线程对象线程id

print('Thread over')

t = threading.Thread(target=worker)

t.start()

  • 示例2 多线程

import threading

import time

def finish_working():

for i in range(5):

print("线程:{} --完成工作加{}".format(threading.currentThread(), i))

print(threading.current_thread())

time.sleep(1)

if __name__ == "__main__":

for i in range(5):

t = threading.Thread(target=finish_working, name=str(i))

t.start() #启动线程,即让线程开始执行

  • .线程执行代码的封装

通过使用threading模块能完成多任务的程序开发,为了让每个线程的封装性更完美,所以使用threading模块时,往往会定义一个新的子类class,只要继承threading.Thread就可以了,然后重写run方法。

import threading

import time

class MyThread(threading.Thread):

def run(self):

print('run')

super().run()

def start(self):

print('start')

super().start()

def worker1():

for _ in range(5):

time.sleep(0.5)

print('线程:{}-woring'.format(threading.currentThread()))

print('Thread over')

t = MyThread(target=worker1,name='w')

t.start()

  • 线程之间共享全局变量

import time

count = 100

def work1():

global count

for i in range(3):

count += 1

print("----in work1, g_num is {}---".format(count))

def work2():

global count

print("----in work2, g_num is {}---".format(count))

print("---线程创建之前g_num is {}---".format(count))

t1 = Thread(target=work1)

t1.start()

#延时一会,保证t1线程中的事情做完

time.sleep(1)

t2 = Thread(target=work2)

t2.start()

输出

---线程创建之前g_num is 100---

----in work1, g_num is 103---

----in work2, g_num is 103---

2.线程同步

线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。

  • Event

Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。

名称含义
set()标记设置为True
clear()标记设置为False
is_set()标记是否设置为True
wait(timeout=None)设置等待标记为True的时长,None为无限等待。得到返回True, 未等到超时了返回False。

from threading import Event, Thread

import time

def boss(event:Event):

"""

等待员工所有任务完成,点评

"""

print("I'm boss, waiting for u.")

event.wait()

print('good job')

def worker(event:Event, count=10):

print("I am working for u")

cups = []

while True:

print('make 1')

time.sleep(0.5)

cups.append(1)

if len(cups) >= count:

event.set()

break

print('I finished my job. cups={}'.format(cups))

event = Event()

w = Thread(target=worker, args=(event, ))

b = Thread(target=boss, args=(event, ))

w.start()

time.sleep(1)

b.start(

总结:
使用同一个event对象的标记flag
谁wait就是等到flag变为True,或者等到超时返回False,不限制等待的个数。

  • Lock

锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者都可以完全使用这个资源。
示例 不加锁:

import threading

cups = []

def worker(task=100):

while True:

count = len(cups)

print(count)

if count >= task:

break

cups.append(1)

print('{}'.format(threading.current_thread()))

print('I finished {} cups'.format(count))

for x in range(10):

threading.Thread(target=worker, args=(100, )).start()

以上任务完成的数量会大于100,使用锁可以解决
示例

import logging

import threading

logging.basicConfig(level=logging.INFO)

cups = []

# 实例一把锁

lock = threading.Lock()

def worker(lock:threading.Lock,task=100):

while True:

lock.acquire() # 加锁

count = len(cups)

logging.info(count)

if count >= task:

lock.release() #记得退出循环时释放锁

break

cups.append(1)

lock.release() # 释放锁

logging.info('{}'.format(threading.current_thread()))

logging.info('I finished {} cups'.format(count))

for x in range(10):

threading.Thread(target=worker, args=(lock, 100)).start()

一般来说加锁后还有一些代码实现,在释放锁之前还有可能抛异常,一旦出现异常,锁是无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。

加锁、解锁常用语句:
1)使用try...finally语句保证锁的释放
2)with上下文管理,锁对象支持上下文管理

示例:

from threading import Thread, Lock

import time, logging

logging.basicConfig(level=logging.INFO)

class Counter:

def __init__(self):

self.c = 0

self.lock = Lock()

def inc(self):

try:

self.lock.acquire()

self.c += 1

logging.info('add {}'.format(self.c))

finally:

self.lock.release()

def dec(self):

try:

self.lock.acquire()

self.c -= 1

logging.info('sub {}'.format(self.c))

finally:

self.lock.release()

@property

def value(self):

with self.lock:

return self.c

def do(c:Counter, count=100):

for _ in range(count):

for i in range(-50, 50):

if i < 0:

c.dec()

else:

c.inc()

c = Counter()

c1 = 10

c2 = 10

for i in range(c1):

Thread(target=do, args=(c,c2)).start()

time.sleep(5)

logging.info(c.value)

3.Condition

Condition 用于生产者、消费模型,为了解决生产者消费速度匹配问题。

构造方法Condition(lock=None), 可以传入一个lock或者RLock对象,默认是RLock。

名称含义
acquire(*args)获取锁
wait(self, timoout=None)等待或超时间
notify(n=1)唤醒至多指定个数的等待的线程,没有等待的线程没有操作
notiy_all()唤醒所有等待的线程

示例1 不使用Condition

import threading

import logging

import random

logging.basicConfig(level=logging.INFO)

class Dispatcher:

def __init__(self, data=0):

self.data = data

self.event = threading.Event()

def produce(self):

for i in range(100):

data = random.randint(1,100)

self.data = data

logging.info("produce--{}".format(self.data))

self.event.wait(1)

def custom(self):

while True:

logging.info("curstom---{}".format(self.data))

self.event.wait(1)

d = Dispatcher()

p = threading.Thread(target=d.produce)

c = threading.Thread(target=d.custom)

c.start()

p.start()

示例2 使用Condition

import threading

import logging

import random

logging.basicConfig(level=logging.INFO)

class Dispatcher:

def __init__(self, data=0):

self.data = data

self.event = threading.Event()

self.cond = threading.Condition()

def produce(self):

for i in range(100):

data = random.randint(1,100)

with self.cond:

self.data = data

self.cond.notify_all()

logging.info('produce {}'.format(self.data))

self.event.wait(1)

def custom(self):

while True:

with self.cond:

self.cond.wait()

logging.info('custom {}'.format(self.data))

self.event.wait(0.5)

d = Dispatcher()

p = threading.Thread(target=d.produce)

c = threading.Thread(target=d.custom)

c.start()

p.start()

示例3 多个消费者

import threading

import logging

import random

logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s')

class Dispatcher:

def __init__(self, data=0):

self.data = data

self.event = threading.Event()

self.cond = threading.Condition()

def produce(self):

for i in range(100):

data = random.randint(1,100)

with self.cond:

self.data = data

self.cond.notify(1)

logging.info('pru {}'.format(self.data))

self.event.wait(1)

def custom(self):

while True:

with self.cond:

self.cond.wait()

logging.info("线程{}--消费{}".format(threading.get_ident(), self.data))

self.event.wait(0.5)

d = Dispatcher()

p = threading.Thread(target=d.produce)

c = threading.Thread(target=d.custom)

c1 = threading.Thread(target=d.custom)

c.start()

c1.start()

p.start()

总结:
Condition是用于生产者消费者模型中,解决生产者消费者速度匹配的问题。
采用通知机制,非常有效率。

使用方式:
使用Condition必须先acquire,用玩release,因为内部使用锁,默认使用RLock,最好的方式是使用with上下文。
消费者wait,等待通知。
生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法。

4 .Barrier

名称含义
Barrier(parties, action=None, timeout=None)构建Barrier对象,指定参与方数目。timeout是wait方法未指定超时的默认值。
n_waiting当前在屏障中等待的线程数
parties各方数,就是需要多少个等待
wait(timeout=None)等待通过屏障,返回0到[线程数-1]的整数,每个线程返回不同。如果wait方法设置了超时,并超时发送,屏障将处于broken状态。

方法:

名称含义
broken如果屏障处于打破状态,返回True
abort()将屏障至于broken状态,等待中的线程或者调用等待方法的线程都会抛出BrokenBarrierError异常, 直到reset方法来恢复屏障
reset()恢复屏障,重新开始拦截

示例

import threading

import logging

logging.basicConfig(level=logging.INFO, format='%(thread)d %(threadName)s %(message)s')

def worker(barrier:threading.Barrier):

logging.info('n_waiting={}'.format(barrier.n_waiting))

try:

bid = barrier.wait()

logging.info("after barrier {}".format(bid))

except threading.BrokenBarrierError:

logging.info('Broken Barrier in {}'.format(threading.current_thread().name))

barrier = threading.Barrier(3)

for i in range(5): #调整数字看结果

threading.Thread(target=worker, args=(barrier, )).start()

所有线程冲到了Barrier前等待,直到到达parties的数目,屏障打开,所有线程停止等待,继续执行。
再有线程wait,屏障就绪等到到达参数方数目。

Barrier应用场景:
并发初始化
所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能正常工作。
10个线程做10种准备工作,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程。
例如,启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作。这些工作可以齐头并进,不过只有都满足了,程序才能继续向后执行。假设数据库连接失败,则初始化工作失败,就要about,屏障broken,所有线程收到异常退出。

以上是 【Python】八.进程和线程 的全部内容, 来源链接: utcz.com/a/88891.html

回到顶部