python-协程

python

Python由于众所周知的GIL的原因,导致其线程无法发挥多核的并行计算能力(当然,后来有了multiprocessing,可以实现多进程并行)。

既然在GIL之下,同一时刻只能有一个线程在运行,那么对于CPU密集的程序来说,一般选择使用进程;而以I/O为瓶颈的程序正是协程所擅长的

  • 并发:指一个时间段内,有几个程序在同一个cpu上运行,但是任意时刻只有一个程序在cpu上运行。比如说在一秒内cpu切换了100个进程,就可以认为cpu的并发是100。

  • 并行:指任意时刻点上,有多个程序同时运行在cpu上,可以理解为多个cpu,每个cpu独立运行自己程序,互不干扰。并行数量和cpu数量是一致的。

  • 同步: 指调用IO操作时(注意同步和异步只是针对于I/O操作来讲的),必须等待IO操作完成后才开始新的的调用方式。

  • 异步:指调用IO操作时,不必等待IO操作完成就开始新的的调用方式。

  • 阻塞: 指调用函数的时候,当前线程被挂起。

  • 非阻塞: 指调用函数的时候,当前线程不会被挂起,而是立即返回。

协程是什么?

协程(coroutine),又称为微线程,纤程。协程的作用:在执行A函数的时候,可以随时中断,去执行B函数,然后中断继续执行A函数(可以自动切换),单着一过程并不是函数调用(没有调用语句),过程很像多线程,然而协程只有一个线程在执行

优缺点

优点:

1、不需要锁,因为协程就只有一个线程,不存在竞争关系

2、效率高,协程是切换函数执行,没有多进程/多线程切换进程/线程的开销

缺点:

1、无法利用多核,从上面可以知道协程是一个线程,切换的是函数执行

2、其实就是一个程序,执行过程中中断切换到另一个执行函数,然后返回中断的地方继续执行,如果发生阻塞操作那就是阻塞整个程序了

 

发展过程

1. yield/send # 生成器模式,推荐学习一下生成器模式的协程,能过帮助理解执行原理
2. yield from # 新增委派
3. async/await # python3.5+新增关键字,主要替代2中协程装饰器机yield from

我这里主要介绍原生特性async/await 及标准库asyncio(异步)

还有一些第三方库gevent等

async/await关键字

async: 是定义协程函数的关键字,async def,会将函数标记为协程函数

await:必须在协程函数中使用,即必须在async标记的函数中使用,且最重要的是必须是用在可等待对象上,可等待对象有协程、任务、Fetrue,如果没有可等待对象,但是又想知道到这个位置挂起程序,那可以使用await asyncio.sleep(0)

使用示例

我这里先写个简单的看下效果

async def funct(index):    

print("start ", index)

await asyncio.sleep(5) # 睡眠5秒

print("end ", index)

if __name__ == "__main__":

# async

print("async test")

start = time.time()

# 启动10个协程

p_list = [funct(i) for i in range(10)]

# 创建事件循环

loop = asyncio.get_event_loop()

# 直到协程任务全部完成才退出循环

loop.run_until_complete(asyncio.gather(*p_list))

print("async time ", time.time() - start)

  

我们先思考下,按照多任务并发,我们预期是全部一起运行,那总耗时应该在5秒这样的

多进程/多线程的代码可能就很好理解,这里就任务函数加了async/await关键字,会像多进程/多线程那样无序并发的运行吗?我们来看结果吧

async test

start 2

start 6

start 0

start 7

start 1

start 8

start 3

start 9

start 4

start 5

end 2

end 0

end 3

end 5

end 4

end 8

end 9

end 1

end 6

end 7

async time 5.003999948501587

  

可以看出运行结果跟多进程/多线程是一样的效果的,很神奇吧,明明就启动了一个线程,这是怎么做到的呢

协程运行原理

从代码中看,任务执行到 await asyncio.sleep(5) 的时候就会被挂起,然后去执行其他空闲的协程,从而实现并发,起到重要作用的是loop事件循环对象,主要监控跟调度协程执行的,基本流程是这样的

1、启动协程,执行到await位置挂起
2、控制权返回给事件循环对象,查询空闲协程
3、返回步骤1,

流程图

从而实现多进程多线程的并发效果,主要是把耗时的 I/O 操纵异步处理,这时就由事件循环控制其他协程继续执行

爬虫示例

# -*- coding=utf-8 -*-

import asyncio

from queue import Queue

import time

import aiohttp

from lxml import etree

import requests

# 创建队列保存结果

q = Queue()

async def send_request(url):

'''

用来发送请求的方法

:return: 返回网页源码

'''

headers = {

'Host': 'movie.douban.com',

'Referer': 'https://movie.douban.com/top250?start=225&filter=',

'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',

}

# 请求出错时,重复请求3次,

i = 0

while i <= 3:

try:

print("[INFO]请求url:"+url)

async with aiohttp.ClientSession() as session:

async with session.get(url, timeout=5) as resp:

return await resp.text()

except Exception as e:

print('[INFO] %s %s'% (e, url))

i += 1

async def parse_page(url):

'''

解析网站源码,并采用xpath提取 电影名称和平分放到队列中

:return:

'''

response = await send_request(url)

html = etree.HTML(response)

# 获取到一页的电影数据

node_list = html.xpath("//div[@class='info']")

for move in node_list:

# 电影名称

title = move.xpath('.//a/span/text()')[0]

# 评分

score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]

# 将每一部电影的名称跟评分加入到队列

q.put(score + "\t" + title)

def main():

base_url = 'https://movie.douban.com/top250?start='

# 构造所有url

url_list = [parse_page(base_url+str(num)) for num in range(0,225+1,25)]

# 创建协程并执行

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*url_list))

while not q.empty():

print(q.get())

if __name__=="__main__":

start = time.time()

main()

print('[info]耗时:%s'%(time.time()-start))

  

执行结果

.............

9.2 大话西游之大圣娶亲

9.3 熔炉

9.2 龙猫

9.2 无间道

9.2 疯狂动物城

9.3 教父

9.1 当幸福来敲门

9.1 怦然心动

9.2 触不可及

[info]耗时:0.6089999675750732

  

爬取250条记录,用时半秒多,还是很给力的

 

注意

协程发起网络请求需要使用aiohttp,基于协程开发的,requests是同步的,不支持协程

看到有大神说这样可以用,使用run_in_executor执行requests发起网络请求,https://stackoverflow.com/questions/22190403/how-could-i-use-requests-in-asyncio 

asyncio.get_event_loop().run_in_executor(None, requests.get, url)

  

这里写了个示例

# encoding=utf-8

import asyncio

import functools

import requests

async def req(url):

headers = {

'Host': 'movie.douban.com',

'Referer': 'https://movie.douban.com/top250?start=225&filter=',

'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.104 Safari/537.36',

}

resp = await asyncio._get_running_loop().run_in_executor(None,

functools.partial(requests.get,

allow_redirects=False,

headers=headers),

url)

print(reqeust->{}, status:{}".format(url, resp.status_code))

if __name__ == "__main__":

base_url = 'https://movie.douban.com/top250?start='

# 构造所有url

url_list = [req(base_url + str(num)) for num in range(0, 225 + 1, 25)]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*url_list))

  

执行结果如下

reqeust->https://movie.douban.com/top250?start=175, status:200

reqeust->https://movie.douban.com/top250?start=0, status:200

reqeust->https://movie.douban.com/top250?start=200, status:200

reqeust->https://movie.douban.com/top250?start=150, status:200

reqeust->https://movie.douban.com/top250?start=125, status:200

reqeust->https://movie.douban.com/top250?start=100, status:200

reqeust->https://movie.douban.com/top250?start=225, status:200

reqeust->https://movie.douban.com/top250?start=25, status:200

reqeust->https://movie.douban.com/top250?start=50, status:200

reqeust->https://movie.douban.com/top250?start=75, status:200

  

functools是向requests.get函数传入参数,run_in_executor函数是只能传入args的,如果传入kwargs就不行,视同functools.partial可以很好的解决,很好使

以上是 python-协程 的全部内容, 来源链接: utcz.com/z/388952.html

回到顶部