09_解决进程间通信线程间通信的资源竞争同步互斥机制

python

1.同步和互斥

    1.目的: 对共有资源的操作会产生争夺,同步互斥是一种解决争夺的方案

    2.临界资源: 多个进程或线程都可以操作的资源

    3.临界区: 操作临界资源的代码段

    4.同步:
        同步是一种合作关系,为完成某个任务多进程或多线程之间形成一种协调,按照条件依次执行传递告知资源情况,这种协调可能是因为阻塞关系达成的


        同步就是协同步调,按预定的先后次序进行运行,如:消息队列通信


        进程(线程)同步可理解为进程(线程)A和B一块配合,A执行到一定程度时要依靠B的某个结果于是停下来示意B运行;B执行再将结果给A;A再继续操作

    5.互斥:


        互斥是一种制约关系,一个进程(线程)进入到临界区会进行加锁操作,其它进程(线程)在企图操作临界资源就会阻塞,只有当资源被释放才能进行操作


        当多个线程同时修改共享数据的时候,需要进行同步控制,使用互斥锁则保证了每次只有一个线程进行写入操作,保证了多线程情况下数据的正确性


        某个线程要更改共享数据时,先将其锁定此时资源即上锁,其他线程不能更改;直到该线程释放资源即解锁,其他的线程才能再次锁定该资源

2.进程事件-Event

    1.进程事件概述: 一个进程通过对Event的事件状态的设置,另外一个进程判断事件状态来确认是阻塞等待还是继续执行

    2.语法概述

from multiprocessing import Event

e = Event() # 创建事件对象

e.wait() # 提供事件阻塞

e.set() # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞

e.clear() # 清除该事件对象的set

e.is_set() # 监测对象是否被设置,设置返回True

    3.临界资源操作示例

from multiprocessing import Process

from multiprocessing import current_process

from multiprocessing import Event

import time

def wait_event(e):

print("子进程%s阻塞等待主进程开放临界区" % current_process())

e.wait()

print("子进程%s开始操作临界区" % current_process(), e.is_set())

def wait_event_timeout(e):

print("子进程%s阻塞等待主进程开放临界区" % current_process())

e.wait(3)

print("子进程%s等待3秒后不再等待主进程开放临界区" % current_process(), e.is_set())

print("子进程%s开始执行其他操作" % current_process(), e.is_set())

def main():

e = Event()

p1 = Process(target=wait_event, name="block", args=(e,))

p2 = Process(target=wait_event_timeout, name="non-block", args=(e,))

p1.start()

p2.start()

print("主进程正在操作临界资源")

print("-" * 50)

time.sleep(6)

e.set()

print("-" * 50)

print("主进程结束对临界资源的操作,开放临界区")

p1.join()

p2.join()

if__name__ == "__main__":

main()

"""执行结果

主进程正在操作临界资源

--------------------------------------------------

子进程<Process(block, started)>阻塞等待主进程开放临界区

子进程<Process(non-block, started)>阻塞等待主进程开放临界区

子进程<Process(non-block, started)>等待3秒后不再等待主进程开放临界区 False

子进程<Process(non-block, started)>开始执行其他操作 False

--------------------------------------------------

主进程结束对临界资源的操作,开放临界区

子进程<Process(block, started)>开始操作临界区 True

"""

    4.红绿灯示例

from multiprocessing import Process

from multiprocessing import Event

import time

import random

def tra(e):

"""信号灯函数"""

# e.set()

# print("33[32m 绿灯亮! 33[0m")

while 1: # 红绿灯得一直亮着,要么是红灯要么是绿灯

if e.is_set(): # True,代表绿灯亮,那么此时代表可以过车

time.sleep(5) # 所以在这让灯等5秒钟,这段时间让车过

print("33[31m 红灯亮! 33[0m") # 绿灯亮了5秒后应该提示到红灯亮

e.clear() # 把is_set设置为False

else:

time.sleep(5) # 此时代表红灯亮了,此时应该红灯亮5秒,在此等5秒

print("33[32m 绿灯亮! 33[0m") # 红的亮够5秒后,该绿灯亮了

e.set() # 将is_set设置为True

def Car(i,e):

e.wait() # 车等在红绿灯,此时要看是红灯还是绿灯,如果is_set为True就是绿灯,此时可以过车

print("第%s辆车过去了"%i)

if__name__ == "__main__":

e = Event()

triff_light = Process(target=tra, args=(e,)) # 信号灯的进程

triff_light.start()

for i in range(50): # 描述50辆车的进程

if i % 3 == 0:

time.sleep(2)

car = Process(target=Car, args=(i + 1, e,))

car.start()

3.进程锁-Lock

    1.进程锁概述: 在lock对象处于上锁状态时,再企图上锁则会阻塞,直到锁被释放才能继续执行上锁操作

    2.语法概述

from multiprocess import Lock

lock = Lock() # 创建锁对象

lock.acquire() # 上锁

lock.release() # 解锁

    3.上下文管理器实现锁管理

from multiprocess import Lock

lock = Lock() # 创建锁对象

# 给with代码段上锁,with代码的结束自动解锁

with lock:

pass

    4.进程锁的实现

from multiprocessing import Process

from multiprocessing import Lock

from time import sleep

import sys

def worker1(lock):

lock.acquire() # 上锁

for _ in range(5):

sleep(1)

# sys.stdout为所有进程共有资源

sys.stdout.write("worker1输出

")

lock.release() # 释放锁

def worker2(lock):

lock.acquire() # 上锁

for _ in range(5):

sleep(1)

sys.stdout.write("worker2输出

")

lock.release() # 释放锁

def main():

# 创建Lock对象

lock = Lock()

p1 = Process(target=worker1, args=(lock,))

p2 = Process(target=worker2, args=(lock,))

p1.start()

p2.start()

p1.join()

p2.join()

if__name__ == "__main__":

main()

"""执行结果

worker1输出

worker1输出

worker1输出

worker1输出

worker1输出

worker2输出

worker2输出

worker2输出

worker2输出

worker2输出

"""

4.进程条件变量-Condition

    1.进程条件变量概述: Condition条件变量通常与一个锁关联
        需要在多个Contidion中共享一个锁时,可以传递一个Lock/RLock实例给构造方法,否则它将自己生成一个RLock实例


        除了Lock带有的锁定池外,Condition还包含一个等待池,池中的线程处于状态图中的等待阻塞状态


        直到另一个线程调用notify()/notifyAll()通知;得到通知后线程进入锁定池等待锁定

    2.语法

from multiprocessing import Condition

con = Condition

con.acquire(): 进程锁

con.release(): 释放锁

con.wait(timeout)

进程挂起,直到收到一个notify通知或者超时才会被唤醒继续运行,timeout超时时间,秒

wait()必须在已获得Lock前提下才能调用否则会触发RuntimeError

con.notify(n=1)

通知其他进程,那些挂起的进程接到这个通知之后会开始运行,默认是通知一个正等待该condition的进程,最多则唤醒n个等待的进程

notify()必须在已获得Lock前提下才能调用否则会触发RuntimeError,notify()不会主动释放Lock

con.notifyAll(): 如果wait状态进程比较多,notifyAll的作用就是通知所有进程

    3.进程条件变量的实现

import multiprocessing, time

def A(cond):

name = multiprocessing.current_process().name

print("%s进程开始执行A函数" % name)

with cond:

print("进程%s执行完成,通知其他进程可以执行" % name)

cond.notify_all() # 通知其他进程,那些挂起的进程接到这个通知之后会开始运行

def B(cond):

name = multiprocessing.current_process().name

print("%s进程开始执行B函数" % name)

with cond:

cond.wait() # 进程挂起,直到收到一个notify通知或者超时才会被唤醒继续运行

print("%s进程继续执行B函数" % name)

def main():

# 创建进程条件变量

cond = multiprocessing.Condition()

p = multiprocessing.Process(target=A, args=(cond,))

p_list = [multiprocessing.Process(target=B, name="Process2[%d]" % i, args=(cond,)) for i in range(1, 3)]

# 开始进程

for i in p_list:

i.start()

time.sleep(1)

p.start()

# 阻塞等待回收进程

p.join()

for i in p_list:

i.join()

if__name__ == "__main__":

main()

"""执行结果

Process2[1]进程开始执行B函数

Process2[2]进程开始执行B函数

Process-1进程开始执行A函数

进程Process-1执行完成,通知其他进程可以执行

Process2[1]进程继续执行B函数

Process2[2]进程继续执行B函数

"""

5.线程事件-Event

    1.语法概述

from threading import Event

e = Event() # 创建事件对象

e.wait() # 提供事件阻塞

e.set() # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞

e.clear() # 清除该事件对象的set

e.is_set() # 监测对象是否被设置,设置返回True

    2.事件解决线程间的资源竞争

import threading

from time import sleep

def fun(event):

print("呼叫foo")

global s

s = "奔波儿灞"

def foo(event):

print("等待口令")

sleep(2)

if s == "奔波儿灞":

print("收到的口令是: %s" % s)

else:

print("口令错误...")

event.set() # 对事件对象进程设置,此时wait判断如果事件被set则结束阻塞

def bar(event):

print("bar开始执行")

sleep(1)

event.wait() # 提供事件阻塞

global s

s = "霸波尔奔"

def main():

s = None

event = threading.Event()

t1 = threading.Thread(name="fun", target=fun, args=(event,))

t2 = threading.Thread(name="foo", target=foo, args=(event,))

t3 = threading.Thread(name="bar", target=bar, args=(event,))

t1.start()

t2.start()

t3.start()

t1.join()

t2.join()

t3.join()

if__name__ == "__main__":

main()

"""执行结果

呼叫foo

等待口令

bar开始执行

收到的口令是: 奔波儿灞

"""

    3.事件实现线程间的同步机制

from threading import Thread, Event

import time, random

def conn_mysql(e, i):

count = 1

while count <= 3:

if e.is_set():

print("第%s个人连接成功!" % i)

break

print("正在尝试第%s次重新连接..." % (count))

e.wait(0.5)

count += 1

def check_mysql(e):

print("33[42m 数据库正在维护 33[0m")

time.sleep(random.randint(1, 2))

e.set()

if__name__ == "__main__":

e = Event()

t_check = Thread(target=check_mysql, args=(e,))

t_check.start()

for i in range(10):

t_conn = Thread(target=conn_mysql, args=(e, i))

t_conn.start()

6.线程锁-Lock

    1.语法概述

lock = Lock()  # 创建锁

lock.acquire() # 加锁

lock.release() # 解锁

    2.线程锁解决线程间的资源竞争

import threading

import time

g_num = 0

def test1(num):

global g_num

for i in range(num):

mutex.acquire() # 上锁

g_num += 1

mutex.release() # 解锁

print("---test1---g_num=%d" % g_num)

def test2(num):

global g_num

for i in range(num):

mutex.acquire() # 上锁

g_num += 1

mutex.release() # 解锁

print("---test2---g_num=%d" % g_num)

# 创建一个互斥锁,默认是未上锁的状态

mutex = threading.Lock()

# 创建2个线程,让他们各自对g_num加1000000次

p1 = threading.Thread(target=test1, args=(1000000,))

p1.start()

p2 = threading.Thread(target=test2, args=(1000000,))

p2.start()

# 等待计算完成

while len(threading.enumerate()) != 1:

time.sleep(1)

print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)

    3.线程锁的死锁问题-添加超时时间避免死锁

# 在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁

# 避免死锁:程序设计时要尽量避免(银行家算法), 添加超时时间等

import threading

import time

class MyThread1(threading.Thread):

def run(self):

# 对mutexA上锁

mutexA.acquire()

# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁

print(self.name + "----do1---up----")

time.sleep(1)

# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了

mutexB.acquire()

print(self.name + "----do1---down----")

mutexB.release()

# 对mutexA解锁

mutexA.release()

class MyThread2(threading.Thread):

def run(self):

# 对mutexB上锁

mutexB.acquire()

# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁

print(self.name + "----do2---up----")

time.sleep(1)

# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了

mutexA.acquire()

print(self.name + "----do2---down----")

mutexA.release()

# 对mutexB解锁

mutexB.release()

mutexA = threading.Lock()

mutexB = threading.Lock()

if__name__ == "__main__":

t1 = MyThread1()

t2 = MyThread2()

t1.start()

t2.start()

    4.线程锁的死锁问题-通过递归锁RLock避免死锁

from threading import Thread

from threading import RLock

import time

# RLock是递归锁 --- 是无止尽的锁,但是所有锁都有一个共同的钥匙

# 想解决死锁,配一把公共的钥匙就可以了

def man(l_tot, l_pap):

l_tot.acquire() # 是男的获得厕所资源,把厕所锁上了

print("Kali在厕所上厕所")

time.sleep(1)

l_pap.acquire() # 男的拿纸资源

print("Kali拿到卫生纸了!")

time.sleep(0.5)

print("Kali完事了!")

l_pap.release() # 男的先还纸

l_tot.release() # 男的还厕所

def woman(l_tot, l_pap):

l_pap.acquire() # 女的拿纸资源

print("Coco拿到卫生纸了!")

time.sleep(1)

l_tot.acquire() # 是女的获得厕所资源,把厕所锁上了

print("Coco在厕所上厕所")

time.sleep(0.5)

print("Coco完事了!")

l_tot.release() # 女的还厕所

l_pap.release() # 女的先还纸

if__name__ == "__main__":

l_tot = RLock()

l_pap = RLock()

t_man = Thread(target=man, args=(l_tot, l_pap))

t_woman = Thread(target=woman, args=(l_tot, l_pap))

t_man.start()

t_woman.start()

7.线程条件变量-Condition

    语法概述

# 条件是让程序员自己去调度线程的一个机制

con = threading.Condition()

con.acquire() # 对资源加锁,加锁后其他位置再加锁则阻塞

con.release() # 解锁

con.wait() # wait函数只能在加锁状态下使用,wait函数会先解锁然后让线程处于等待通知的阻塞状态

con.notify() # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作

    线程条件变量示例-打压股市

import threading

import time

class Gov(threading.Thread):

def run(self):

global num

con.acquire() # 对资源加锁,加锁后其他位置再加锁则阻塞

while True:

print("开始拉升股市")

num += 1

print("拉升了%s个点" % num)

time.sleep(1)

if num == 5:

print("暂时安全")

con.notify() # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作

print("不操作状态")

con.wait() # wait先解锁然后让线程处于等待通知的阻塞状态,直到接收到t2线程发出的通知后结束阻塞并加锁

con.release()

class Consumers(threading.Thread):

def run(self):

global num

con.acquire() # 再对资源加锁,此时会阻塞在这里

while True:

if num > 0:

print("开始打压股市")

num -= 1

print("下降到%s个点" % num)

time.sleep(1)

if num == 0:

print("天台见")

con.notify() # 发送通知,线程接收到通知后结束wait阻塞并且执行acquire加锁操作

print("不能再下降了")

con.wait() # wait先解锁然后让线程处于等待通知的阻塞状态,直到接收到t1线程发出的通知后结束阻塞并加锁

con.release()

def main():

global num

num = 0

# 创建条件变量

global con

con = threading.Condition()

t1 = Gov()

t2 = Consumers()

t1.start()

t2.start()

t1.join()

t2.join()

if__name__ == "__main__":

main()

"""执行结果

开始拉升股市

拉升了1个点

开始拉升股市

拉升了2个点

开始拉升股市

拉升了3个点

开始拉升股市

拉升了4个点

开始拉升股市

拉升了5个点

暂时安全

不操作状态

开始打压股市

下降到4个点

开始打压股市

下降到3个点

开始打压股市

下降到2个点

开始打压股市

下降到1个点

开始打压股市

下降到0个点

天台见

不能再下降了

开始拉升股市

拉升了1个点

"""

    线程条件变量示例-吃火锅

import threading

import time

con = threading.Condition()

num = 0

# 生产者

class Producer(threading.Thread):

def__init__(self):

threading.Thread.__init__(self)

def run(self):

# 锁定线程

global num

con.acquire()

while True:

print("开始添加!!!")

num += 1

print("火锅里面鱼丸个数:%s" % str(num))

time.sleep(1)

if num >= 5:

print("火锅里面里面鱼丸数量已经到达5个,无法添加了!")

# 唤醒等待的线程

con.notify() # 唤醒小伙伴开吃啦

# 等待通知

con.wait()

# 释放锁

con.release()

# 消费者

class Consumers(threading.Thread):

def__init__(self):

threading.Thread.__init__(self)

def run(self):

con.acquire()

global num

while True:

print("开始吃啦!!!")

num -= 1

print("火锅里面剩余鱼丸数量:%s" % str(num))

time.sleep(2)

if num <= 0:

print("锅底没货了,赶紧加鱼丸吧!")

con.notify() # 唤醒其它线程

# 等待通知

con.wait()

con.release()

def main():

p = Producer() # 实例化一个生产者对象

c = Consumers() # 实例化一个消费者对象

p.start()

c.start()

if__name__ == "__main__":

main()

以上是 09_解决进程间通信线程间通信的资源竞争同步互斥机制 的全部内容, 来源链接: utcz.com/z/529974.html

回到顶部