Python队列、进程间通信、线程案例

进程互斥锁

多进程同时抢购余票

# 并发运行,效率高,但竞争写同一文件,数据写入错乱

# data.json文件内容为 {"ticket_num": 1}

import json

import time

from multiprocessing import Process

def search(user):

with open('data.json', 'r', encoding='utf-8') as f:

dic = json.load(f)

print(f'用户{user}查看余票,还剩{dic.get("ticket_num")}...')

def buy(user):

with open('data.json', 'r', encoding='utf-8') as f:

dic = json.load(f)

time.sleep(0.1)

if dic['ticket_num'] > 0:

dic['ticket_num'] -= 1

with open('data.json', 'w', encoding='utf-8') as f:

json.dump(dic, f)

print(f'用户{user}抢票成功!')

else:

print(f'用户{user}抢票失败')

def run(user):

search(user)

buy(user)

if __name__ == '__main__':

for i in range(10): # 模拟10个用户抢票

p = Process(target=run, args=(f'用户{i}', ))

p.start()

使用锁来保证数据安全

# data.json文件内容为 {"ticket_num": 1}

import json

import time

from multiprocessing import Process, Lock

def search(user):

with open('data.json', 'r', encoding='utf-8') as f:

dic = json.load(f)

print(f'用户{user}查看余票,还剩{dic.get("ticket_num")}...')

def buy(user):

with open('data.json', 'r', encoding='utf-8') as f:

dic = json.load(f)

time.sleep(0.2)

if dic['ticket_num'] > 0:

dic['ticket_num'] -= 1

with open('data.json', 'w', encoding='utf-8') as f:

json.dump(dic, f)

print(f'用户{user}抢票成功!')

else:

print(f'用户{user}抢票失败')

def run(user, mutex):

search(user)

mutex.acquire() # 加锁

buy(user)

mutex.release() # 释放锁

if __name__ == '__main__':

# 调用Lock()类得到一个锁对象

mutex = Lock()

for i in range(10): # 模拟10个用户抢票

p = Process(target=run, args=(f'用户{i}', mutex))

p.start()

进程互斥锁:

让并发变成串行,牺牲了执行效率,保证了数据安全

在程序并发时,需要修改数据使用

队列

队列遵循的是先进先出

队列:相当于内存中一个队列空间,可以存放多个数据,但数据的顺序是由先进去的排在前面。

q.put() 添加数据

q.get() 取数据,遵循队列先进先出

q.get_nowait() 获取队列数据, 队列中没有就会报错

q.put_nowait 添加数据,若队列满了也会报错

q.full() 查看队列是否满了

q.empty() 查看队列是否为空

from multiprocessing import Queue

# 调用队列类,实例化队列对象

q = Queue(5) # 队列中存放5个数据

# put添加数据,若队列里的数据满了就会卡住

q.put(1)

print('进入数据1')

q.put(2)

print('进入数据2')

q.put(3)

print('进入数据3')

q.put(4)

print('进入数据4')

q.put(5)

print('进入数据5')

# 查看队列是否满了

print(q.full())

# 添加数据, 若队列满了也会报错

q.put_nowait(6)

# q.get() 获取的数据遵循先进先出

print(q.get())

print(q.get())

print(q.get())

print(q.get())

print(q.get())

# print(q.get())

print(q.get_nowait()) # 获取队列数据, 队列中没有就会报错

# 判断队列是否为空

print(q.empty())

q.put(6)

print('进入数据6')

进程间通信

IPC(Inter-Process Communication)

进程间数据是相互隔离的,若想实现进程间通信,可以利用队列

from multiprocessing import Process, Queue

def task1(q):

data = 'hello 你好'

q.put(data)

print('进程1添加数据到队列')

def task2(q):

print(q.get())

print('进程2从队列中获取数据')

if __name__ == '__main__':

q = Queue()

p1 = Process(target=task1, args=(q, ))

p2 = Process(target=task2, args=(q, ))

p1.start()

p2.start()

print('主进程')

生产者与消费者

在程序中,通过队列生产者把数据添加到队列中,消费者从队列中获取数据

from multiprocessing import Process, Queue

import time

# 生产者

def producer(name, food, q):

for i in range(10):

data = food, i

msg = f'用户{name}开始制作{data}'

print(msg)

q.put(data)

time.sleep(0.1)

# 消费者

def consumer(name, q):

while True:

data = q.get()

if not data:

break

print(f'用户{name}开始吃{data}')

if __name__ == '__main__':

q = Queue()

p1 = Process(target=producer, args=('neo', '煎饼', q))

p2 = Process(target=producer, args=('wick', '肉包', q))

c1 = Process(target=consumer, args=('cwz', q))

c2 = Process(target=consumer, args=('woods', q))

p1.start()

p2.start()

c1.daemon = True

c2.daemon = True

c1.start()

c2.start()

print('主')

线程

线程的概念

进程与线程都是虚拟单位

进程:资源单位

线程:执行单位

开启一个进程,一定会有一个线程,线程才是真正执行者

开启进程:

  • 开辟一个名称空间,每开启一个进程都会占用一份内存资源
  • 会自带一个线程

开启线程:

  • 一个进程可以开启多个线程
  • 线程的开销远小于进程

注意:线程不能实现并行,线程只能实现并发,进程可以实现并行

线程的两种创建方式

from threading import Thread

import time

# 创建线程方式1

def task():

print('线程开启')

time.sleep(1)

print('线程结束')

if __name__ == '__main__':

t = Thread(target=task)

t.start()

# 创建线程方式2

class MyThread(Thread):

def run(self):

print('线程开启...')

time.sleep(1)

print('线程结束...')

if __name__ == '__main__':

t = MyThread()

t.start()

线程对象的方法

from threading import Thread

from threading import current_thread

import time

def task():

print(f'线程开启{current_thread().name}')

time.sleep(1)

print(f'线程结束{current_thread().name}')

if __name__ == '__main__':

t = Thread(target=task)

print(t.isAlive())

# t.daemon = True

t.start()

print(t.isAlive())

线程互斥锁

线程之间数据是共享的

from threading import Thread

from threading import Lock

import time

mutex = Lock()

n = 100

def task(i):

print(f'线程{i}启动')

global n

mutex.acquire()

temp = n

time.sleep(0.1)

n = temp - 1

print(n)

mutex.release()

if __name__ == '__main__':

t_l = []

for i in range(100):

t = Thread(target=task, args=(i, ))

t_l.append(t)

t.start()

for t in t_l:

t.join()

print(n)

以上是 Python队列、进程间通信、线程案例 的全部内容, 来源链接: utcz.com/z/321443.html

回到顶部