09_解决进程间通信线程间通信的资源竞争同步互斥机制
1.同步和互斥
1.目的: 对共有资源的操作会产生争夺,同步互斥是一种解决争夺的方案
2.临界资源: 多个进程或线程都可以操作的资源
3.临界区: 操作临界资源的代码段
4.同步:
同步是一种合作关系,为完成某个任务多进程或多线程之间形成一种协调,按照条件依次执行传递告知资源情况,这种协调可能是因为阻塞关系达成的
同步就是协同步调,按预定的先后次序进行运行,如:消息队列通信
进程(线程)同步可理解为进程(线程)A和B一块配合,A执行到一定程度时要依靠B的某个结果于是停下来示意B运行;B执行再将结果给A;A再继续操作
5.互斥:
互斥是一种制约关系,一个进程(线程)进入到临界区会进行加锁操作,其它进程(线程)在企图操作临界资源就会阻塞,只有当资源被释放才能进行操作
当多个线程同时修改共享数据的时候,需要进行同步控制,使用互斥锁则保证了每次只有一个线程进行写入操作,保证了多线程情况下数据的正确性
某个线程要更改共享数据时,先将其锁定此时资源即上锁,其他线程不能更改;直到该线程释放资源即解锁,其他的线程才能再次锁定该资源
2.进程事件-Event
1.进程事件概述: 一个进程通过对Event的事件状态的设置,另外一个进程判断事件状态来确认是阻塞等待还是继续执行
2.语法概述
from multiprocessing import Evente
= Event() # 创建事件对象e.wait() # 提供事件阻塞
e.set() # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞
e.clear() # 清除该事件对象的set
e.is_set() # 监测对象是否被设置,设置返回True
3.临界资源操作示例
from multiprocessing import Processfrom multiprocessing import current_processfrom multiprocessing import Eventimport timedef wait_event(e):print("子进程%s阻塞等待主进程开放临界区" % current_process())e.wait()
print("子进程%s开始操作临界区" % current_process(), e.is_set())def wait_event_timeout(e):print("子进程%s阻塞等待主进程开放临界区" % current_process())e.wait(
3)print("子进程%s等待3秒后不再等待主进程开放临界区" % current_process(), e.is_set())print("子进程%s开始执行其他操作" % current_process(), e.is_set())def main():e
= Event()p1
= Process(target=wait_event, name="block", args=(e,))p2
= Process(target=wait_event_timeout, name="non-block", args=(e,))p1.start()
p2.start()
print("主进程正在操作临界资源")print("-" * 50)time.sleep(
6)e.set()
print("-" * 50)print("主进程结束对临界资源的操作,开放临界区")p1.join()
p2.join()
if__name__ == "__main__":main()
"""执行结果主进程正在操作临界资源
--------------------------------------------------
子进程<Process(block, started)>阻塞等待主进程开放临界区
子进程<Process(non-block, started)>阻塞等待主进程开放临界区
子进程<Process(non-block, started)>等待3秒后不再等待主进程开放临界区 False
子进程<Process(non-block, started)>开始执行其他操作 False
--------------------------------------------------
主进程结束对临界资源的操作,开放临界区
子进程<Process(block, started)>开始操作临界区 True
"""
4.红绿灯示例
from multiprocessing import Processfrom multiprocessing import Eventimport timeimport randomdef tra(e):"""信号灯函数"""# e.set()
# print("33[32m 绿灯亮! 33[0m")
while 1: # 红绿灯得一直亮着,要么是红灯要么是绿灯
if e.is_set(): # True,代表绿灯亮,那么此时代表可以过车
time.sleep(5) # 所以在这让灯等5秒钟,这段时间让车过
print("33[31m 红灯亮! 33[0m") # 绿灯亮了5秒后应该提示到红灯亮
e.clear() # 把is_set设置为False
else:
time.sleep(5) # 此时代表红灯亮了,此时应该红灯亮5秒,在此等5秒
print("33[32m 绿灯亮! 33[0m") # 红的亮够5秒后,该绿灯亮了
e.set() # 将is_set设置为True
def Car(i,e):
e.wait() # 车等在红绿灯,此时要看是红灯还是绿灯,如果is_set为True就是绿灯,此时可以过车
print("第%s辆车过去了"%i)
if__name__ == "__main__":
e = Event()
triff_light = Process(target=tra, args=(e,)) # 信号灯的进程
triff_light.start()
for i in range(50): # 描述50辆车的进程
if i % 3 == 0:
time.sleep(2)
car = Process(target=Car, args=(i + 1, e,))
car.start()
3.进程锁-Lock
1.进程锁概述: 在lock对象处于上锁状态时,再企图上锁则会阻塞,直到锁被释放才能继续执行上锁操作
2.语法概述
from multiprocess import Locklock
= Lock() # 创建锁对象lock.acquire() # 上锁
lock.release() # 解锁
3.上下文管理器实现锁管理
from multiprocess import Locklock
= Lock() # 创建锁对象#
给with代码段上锁,with代码的结束自动解锁with lock:
pass
4.进程锁的实现
from multiprocessing import Processfrom multiprocessing import Lockfrom time import sleepimport sysdef worker1(lock):lock.acquire()
# 上锁for _ in range(5):
sleep(1)
# sys.stdout为所有进程共有资源
sys.stdout.write("worker1输出
")
lock.release() # 释放锁
def worker2(lock):
lock.acquire() # 上锁
for _ in range(5):
sleep(1)
sys.stdout.write("worker2输出
")
lock.release() # 释放锁
def main():
# 创建Lock对象
lock = Lock()
p1 = Process(target=worker1, args=(lock,))
p2 = Process(target=worker2, args=(lock,))
p1.start()
p2.start()
p1.join()
p2.join()
if__name__ == "__main__":
main()
"""执行结果
worker1输出
worker1输出
worker1输出
worker1输出
worker1输出
worker2输出
worker2输出
worker2输出
worker2输出
worker2输出
"""
4.进程条件变量-Condition
1.进程条件变量概述: Condition条件变量通常与一个锁关联
需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例
除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态
直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定
2.语法
from multiprocessing import Conditioncon
= Conditioncon.acquire(): 进程锁
con.release(): 释放锁
con.wait(timeout)
进程挂起,直到收到一个notify通知或者超时才会被唤醒继续运行,timeout超时时间,秒
wait()必须在已获得Lock前提下才能调用否则会触发RuntimeError
con.notify(n
=1)通知其他进程,那些挂起的进程接到这个通知之后会开始运行,默认是通知一个正等待该condition的进程,最多则唤醒n个等待的进程
notify()必须在已获得Lock前提下才能调用否则会触发RuntimeError,notify()不会主动释放Lock
con.notifyAll(): 如果wait状态进程比较多,notifyAll的作用就是通知所有进程
3.进程条件变量的实现
import multiprocessing, timedef A(cond):name
= multiprocessing.current_process().nameprint("%s进程开始执行A函数" % name)with cond:
print("进程%s执行完成,通知其他进程可以执行" % name)cond.notify_all()
# 通知其他进程,那些挂起的进程接到这个通知之后会开始运行def B(cond):
name = multiprocessing.current_process().name
print("%s进程开始执行B函数" % name)
with cond:
cond.wait() # 进程挂起,直到收到一个notify通知或者超时才会被唤醒继续运行
print("%s进程继续执行B函数" % name)
def main():
# 创建进程条件变量
cond = multiprocessing.Condition()
p = multiprocessing.Process(target=A, args=(cond,))
p_list = [multiprocessing.Process(target=B, name="Process2[%d]" % i, args=(cond,)) for i in range(1, 3)]
# 开始进程
for i in p_list:
i.start()
time.sleep(1)
p.start()
# 阻塞等待回收进程
p.join()
for i in p_list:
i.join()
if__name__ == "__main__":
main()
"""执行结果
Process2[1]进程开始执行B函数
Process2[2]进程开始执行B函数
Process-1进程开始执行A函数
进程Process-1执行完成,通知其他进程可以执行
Process2[1]进程继续执行B函数
Process2[2]进程继续执行B函数
"""
5.线程事件-Event
1.语法概述
from threading import Evente
= Event() # 创建事件对象e.wait() # 提供事件阻塞
e.set() # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞
e.clear() # 清除该事件对象的set
e.is_set() # 监测对象是否被设置,设置返回True
2.事件解决线程间的资源竞争
import threadingfrom time import sleepdef fun(event):print("呼叫foo")global ss
= "奔波儿灞"def foo(event):
print("等待口令")
sleep(2)
if s == "奔波儿灞":
print("收到的口令是: %s" % s)
else:
print("口令错误...")
event.set() # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞
def bar(event):
print("bar开始执行")
sleep(1)
event.wait() # 提供事件阻塞
global s
s = "霸波尔奔"
def main():
s = None
event = threading.Event()
t1 = threading.Thread(name="fun", target=fun, args=(event,))
t2 = threading.Thread(name="foo", target=foo, args=(event,))
t3 = threading.Thread(name="bar", target=bar, args=(event,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
if__name__ == "__main__":
main()
"""执行结果
呼叫foo
等待口令
bar开始执行
收到的口令是: 奔波儿灞
"""
3.事件实现线程间的同步机制
from threading import Thread, Eventimport time, randomdef conn_mysql(e, i):count
= 1while count <= 3:
if e.is_set():
print("第%s个人连接成功!" % i)
break
print("正在尝试第%s次重新连接..." % (count))
e.wait(0.5)
count += 1
def check_mysql(e):
print("33[42m 数据库正在维护 33[0m")
time.sleep(random.randint(1, 2))
e.set()
if__name__ == "__main__":
e = Event()
t_check = Thread(target=check_mysql, args=(e,))
t_check.start()
for i in range(10):
t_conn = Thread(target=conn_mysql, args=(e, i))
t_conn.start()
6.线程锁-Lock
1.语法概述
lock = Lock() # 创建锁lock.acquire() # 加锁
lock.release() # 解锁
2.线程锁解决线程间的资源竞争
import threadingimport timeg_num
= 0def test1(num):global g_numfor i in range(num):mutex.acquire()
# 上锁g_num += 1
mutex.release() # 解锁
print("---test1---g_num=%d" % g_num)
def test2(num):
global g_num
for i in range(num):
mutex.acquire() # 上锁
g_num += 1
mutex.release() # 解锁
print("---test2---g_num=%d" % g_num)
# 创建一个互斥锁,默认是未上锁的状态
mutex = threading.Lock()
# 创建2个线程,让他们各自对g_num加1000000次
p1 = threading.Thread(target=test1, args=(1000000,))
p1.start()
p2 = threading.Thread(target=test2, args=(1000000,))
p2.start()
# 等待计算完成
while len(threading.enumerate()) != 1:
time.sleep(1)
print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
3.线程锁的死锁问题-添加超时时间避免死锁
# 在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁#
避免死锁:程序设计时要尽量避免(银行家算法), 添加超时时间等import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
mutexA.acquire()
# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
print(self.name + "----do1---up----")
time.sleep(1)
# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
mutexB.acquire()
print(self.name + "----do1---down----")
mutexB.release()
# 对mutexA解锁
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
mutexB.acquire()
# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
print(self.name + "----do2---up----")
time.sleep(1)
# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
mutexA.acquire()
print(self.name + "----do2---down----")
mutexA.release()
# 对mutexB解锁
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if__name__ == "__main__":
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
4.线程锁的死锁问题-通过递归锁RLock避免死锁
from threading import Threadfrom threading import RLockimport time# RLock是递归锁 --- 是无止尽的锁,但是所有锁都有一个共同的钥匙#
想解决死锁,配一把公共的钥匙就可以了def man(l_tot, l_pap):
l_tot.acquire() # 是男的获得厕所资源,把厕所锁上了
print("Kali在厕所上厕所")
time.sleep(1)
l_pap.acquire() # 男的拿纸资源
print("Kali拿到卫生纸了!")
time.sleep(0.5)
print("Kali完事了!")
l_pap.release() # 男的先还纸
l_tot.release() # 男的还厕所
def woman(l_tot, l_pap):
l_pap.acquire() # 女的拿纸资源
print("Coco拿到卫生纸了!")
time.sleep(1)
l_tot.acquire() # 是女的获得厕所资源,把厕所锁上了
print("Coco在厕所上厕所")
time.sleep(0.5)
print("Coco完事了!")
l_tot.release() # 女的还厕所
l_pap.release() # 女的先还纸
if__name__ == "__main__":
l_tot = RLock()
l_pap = RLock()
t_man = Thread(target=man, args=(l_tot, l_pap))
t_woman = Thread(target=woman, args=(l_tot, l_pap))
t_man.start()
t_woman.start()
7.线程条件变量-Condition
语法概述
# 条件是让程序员自己去调度线程的一个机制con = threading.Condition()
con.acquire() # 对资源加锁,加锁后其他位置再加锁则阻塞
con.release() # 解锁
con.wait() # wait函数只能在加锁状态下使用,wait函数会先解锁然后让线程处于等待通知的阻塞状态
con.notify() # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作
线程条件变量示例-打压股市
import threadingimport timeclass Gov(threading.Thread):def run(self):global numcon.acquire()
# 对资源加锁,加锁后其他位置再加锁则阻塞while True:
print("开始拉升股市")
num += 1
print("拉升了%s个点" % num)
time.sleep(1)
if num == 5:
print("暂时安全")
con.notify() # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作
print("不操作状态")
con.wait() # wait先解锁然后让线程处于等待通知的阻塞状态,直到接收到t2线程发出的通知后结束阻塞并加锁
con.release()
class Consumers(threading.Thread):
def run(self):
global num
con.acquire() # 再对资源加锁,此时会阻塞在这里
while True:
if num > 0:
print("开始打压股市")
num -= 1
print("下降到%s个点" % num)
time.sleep(1)
if num == 0:
print("天台见")
con.notify() # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作
print("不能再下降了")
con.wait() # wait先解锁然后让线程处于等待通知的阻塞状态,直到接收到t1线程发出的通知后结束阻塞并加锁
con.release()
def main():
global num
num = 0
# 创建条件变量
global con
con = threading.Condition()
t1 = Gov()
t2 = Consumers()
t1.start()
t2.start()
t1.join()
t2.join()
if__name__ == "__main__":
main()
"""执行结果
开始拉升股市
拉升了1个点
开始拉升股市
拉升了2个点
开始拉升股市
拉升了3个点
开始拉升股市
拉升了4个点
开始拉升股市
拉升了5个点
暂时安全
不操作状态
开始打压股市
下降到4个点
开始打压股市
下降到3个点
开始打压股市
下降到2个点
开始打压股市
下降到1个点
开始打压股市
下降到0个点
天台见
不能再下降了
开始拉升股市
拉升了1个点
"""
线程条件变量示例-吃火锅
import threadingimport timecon
= threading.Condition()num
= 0# 生产者class Producer(threading.Thread):
def__init__(self):
threading.Thread.__init__(self)
def run(self):
# 锁定线程
global num
con.acquire()
while True:
print("开始添加!!!")
num += 1
print("火锅里面鱼丸个数:%s" % str(num))
time.sleep(1)
if num >= 5:
print("火锅里面里面鱼丸数量已经到达5个,无法添加了!")
# 唤醒等待的线程
con.notify() # 唤醒小伙伴开吃啦
# 等待通知
con.wait()
# 释放锁
con.release()
# 消费者
class Consumers(threading.Thread):
def__init__(self):
threading.Thread.__init__(self)
def run(self):
con.acquire()
global num
while True:
print("开始吃啦!!!")
num -= 1
print("火锅里面剩余鱼丸数量:%s" % str(num))
time.sleep(2)
if num <= 0:
print("锅底没货了,赶紧加鱼丸吧!")
con.notify() # 唤醒其它线程
# 等待通知
con.wait()
con.release()
def main():
p = Producer() # 实例化一个生产者对象
c = Consumers() # 实例化一个消费者对象
p.start()
c.start()
if__name__ == "__main__":
main()
以上是 09_解决进程间通信线程间通信的资源竞争同步互斥机制 的全部内容, 来源链接: utcz.com/z/529974.html