从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。
Python 中使用协程最常用的库莫过于 asyncio,所以本文会以 asyncio 为基础来介绍协程的使用。
首先我们需要了解下面几个概念。
定义携程
首先我们来定义一个协程,体验一下它和普通进程在实现上的不同之处,代码如下:
import asyncio
async def execute(x):
print(‘Number:‘, x)
coroutine1 = execute(1)
print(‘Coroutine:‘, coroutine) # Coroutine: <coroutine object execute at 0x000001ECF6762DC8>
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine) # Number: 1
首先我们引入了 asyncio
这个包,这样我们才可以使用 async
和 await
,然后我们使用 async
定义了一个 execute 方法,方法接收一个数字参数,方法执行之后会打印这个数字。
随后我们直接调用了这个方法,然而这个方法并没有执行,而是返回了一个 coroutine 协程对象。随后我们使用 get_event_loop
方法创建了一个事件循环 loop
,并调用了 loop
对象的 run_until_complete
方法将协程注
册到事件循环 loop
中,然后启动。最后我们才看到了 execute 方法打印了输出结果。
可见,async
定义的方法就会变成一个无法直接执行的 coroutine
协程对象,必须将其注册到事件循环中才可以执行。
上面我们还提到了 task
,它是对 coroutine
对象的进一步封装,它里面相比 coroutine
对象多了运行状态,比如 running
、finished
等,我们可以用这些状态来获取协程对象的执行情况。
在上面的例子中,当我们将 coroutine
对象传递给 run_until_complete
方法的时候,实际上它进行了一个操作就是将 coroutine
封装成了 task
对象,我们也可以显式地进行声明,如下所示:
async def execute(x):
print(‘Number:‘, x)
return x
coroutine = execute(1)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(‘Task:‘, task) # Task: <Task pending coro=<execute() running at C:/。。。:27>>
loop.run_until_complete(task) # Number: 1
print(‘Task:‘, task) # Task: <Task finished coro=<execute2() done, defined at C:/。。。:27> result=1>
这里我们定义了 loop
对象之后,接着调用了它的 create_task
方法将 coroutine
对象转化为了 task
对象,随后我们打印输出一下,发现它是 pending
状态。接着我们将 task
对象添加到事件循环中得到执行,随后我们再打印输出一下 task
对象,发现它的状态就变成了 finished
,同时还可以看到其 result
变成了 1
,也就是我们定义的 execute
方法的返回结果。
另外定义 task
对象还有一种方式,就是直接通过 asyncio
的 ensure_future
方法,返回结果也是 task
对象,这样的话我们就可以不借助于 loop
来定义,即使我们还没有声明 loop
也可以提前定义好 task
对象,写法如下:
async def execute(x):
print(‘Number:‘, x)
return x
coroutine = execute(1)
task = asyncio.ensure_future(coroutine)
print(‘Task:‘, task) # Task: <Task pending coro=<execute() running at C:/。。。:50>>
loop = asyncio.get_event_loop()
loop.run_until_complete(task) # Number: 1
print(‘Task:‘, task) # Task: <Task finished coro=<execute() done, defined at C:/。。。:50> result=1>
发现其运行效果都是一样的。
绑定回调
另外我们也可以为某个 task 绑定一个回调方法,比如我们来看下面的例子:
async def request():
url = ‘https://www.baidu.com‘
status = requests.get(url)
return status
def callback(_task):
print(‘Task Result:‘, _task.result())
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop = asyncio.get_event_loop()
loop.run_until_complete(task) # Task Result: <Response [200]>
在这里我们定义了一个 request
方法,请求了百度,获取其响应状态,但是这个方法里面我们没有任何 print
语句。随后我们定义了一个 callback
方法,这个方法接收一个参数,是 task
对象,然后调用 print
方法打印
了 task
对象的结果。这样我们就定义好了一个 coroutine
对象和一个回调方法,我们现在希望的效果是,当 coroutine
对象执行完毕之后,就去执行声明的 callback
方法。
那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback
方法即可,我们将 callback
方法传递给了封装好的 task
对象,这样当 task
执行完毕之后就可以调用 callback
方法了,同时 task
对象还会作为参数传递给 callback
方法,调用 task
对象的 result
方法就可以获取返回结果了。
实际上不用回调方法,直接在 task 运行完毕之后也可以直接调用 result
方法获取结果,如下所示:
async def request():
url = ‘https://www.baidu.com‘
status = requests.get(url)
return status
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(‘Task Result:‘, task.result()) # Task Result: <Response [200]>
运行结果是一样的。
多任务协程
上面的例子我们只执行了一次请求,如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task
列表,然后使用 asyncio
的 wait
方法即可执行,看下面的例子:
async def request():
url = ‘https://www.baidu.com‘
status = requests.get(url)
return status
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print(‘Tasks:‘, tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print(‘Task Result:‘, task.result())
这里我们使用一个 for 循环创建了五个 task
,组成了一个列表,然后把这个列表首先传递给了 asyncio
的 wait
方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来,运行结果如下:
Tasks: [<Task pending . . . >>, . . . , <Task pending . . . >>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
可以看到五个任务被顺次执行了,并得到了运行结果。
前面讲了这么多,又是 async
,又是 coroutine
,又是 task
,又是 callback
,但似乎并没有看出协程的优势啊?反而写法上更加奇怪和麻烦了,别急,上面的案例只是为后面的使用作铺垫,接下来我们正式来看下协程在解决 IO
密集型任务上有怎样的优势吧!
上面的代码中,我们用一个网络请求作为示例,这就是一个耗时等待
的操作,因为我们请求网页之后需要等待页面响应并返回结果。耗时等待
的操作一般都是 IO
操作,比如文件读取、网络请求等等。协程对于处理这种操作是有很大优势的,当遇到需要等待的情况的时候,程序可以暂时挂起,转而去执行其他的操作,从而避免一直等待一个程序而耗费过多的时间,充分利用资源。
为了表现出协程的优势,我们拿 https://static4.scrape.cuiqingcai.com/ 这个网站为例来进行演示,因为该网站响应比较慢,所以我们可以通过爬取时间来直观地感受到爬取速度的提升。
为了让你更好地理解协程的正确使用方法,这里我们先来看看使用协程时常犯的错误,后面再给出正确的例子来对比一下。
首先,我们还是拿之前的 requests 来进行网页请求,接下来我们再重新使用上面的方法请求一遍:
import asyncio
import requests
import time
start = time.time()
async def request():
url = ‘https://static4.scrape.cuiqingcai.com/‘
print(‘Waiting for‘, url)
response = requests.get(url)
print(‘Get response from‘, url, ‘response‘, response)
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(‘Cost time:‘, end - start)
在这里我们还是创建了 10 个 task
,然后将 task
列表传给 wait
方法并注册到时间循环中执行。
运行结果如下:
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
. . .
Cost time: 49.008238554000854
可以发现和正常的请求并没有什么两样,依然还是顺次执行的,耗时 49 秒,平均一个请求耗时 5 秒,说好的异步处理呢?
其实,要实现异步处理,我们得先要有挂起
的操作,当一个任务需要等待 IO
结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源,上面方法都是一本正经的串行走下来,连个挂起都没有,怎么可能实现异步?想太多了。
要实现异步,接下来我们需要了解一下 await
的用法,使用 await
可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await
,事件循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。
所以,我们可能会将代码中的 request
方法改成如下的样子:
async def request():
url = ‘https://static4.scrape.cuiqingcai.com/‘
print(‘Waiting for‘, url)
response = await requests.get(url)
print(‘Get response from‘, url, ‘response‘, response)
仅仅是在 requests
前面加了一个 await
,然而执行以下代码,会得到如下报错:
Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at C:/Users/Administrator/Desktop/spider/16.async_theory.py:128> exception=TypeError("object Response can‘t be used in ‘await‘ expression")>
Traceback (most recent call last):
File "C:/Users/Administrator/Desktop/spider/16.async_theory.py", line 131, in request
response = await requests.get(url)
TypeError: object Response can‘t be used in ‘await‘ expression
这次它遇到 await
方法确实挂起了,也等待了,但是最后却报了这么个错,这个错误的意思是 requests
返回的 Response
对象不能和 await
一起使用,为什么呢?因为根据官方文档说明,await
后面的对象必须是如下格式之一:
coroutine
对象。types.coroutine
修饰的生成器,这个生成器可以返回 coroutine
对象。await
方法的对象返回的一个迭代器。参考:https://www.python.org/dev/peps/pep-0492/#await-expression。
requests
返回的 Response
不符合上面任一条件,因此就会报上面的错误了。
那么你可能会发现,既然 await
后面可以跟一个 coroutine
对象,那么我用 async
把请求的方法改成 coroutine
对象不就可以了吗?所以就改写成如下的样子:
import asyncio
import requests
import time
start = time.time()
async def get(url):
return requests.get(url)
async def request():
url = ‘https://static4.scrape.cuiqingcai.com/‘
print(‘Waiting for‘, url)
response = await get(url)
print(‘Get response from‘, url, ‘response‘, response)
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(‘Cost time:‘, end - start)
这里我们将请求页面的方法独立出来,并用 async
修饰,这样就得到了一个 coroutine
对象,我们运行一下看看:
Waiting for https://static4.scrape.cuiqingcai.com/
Get response from https://static4.scrape.cuiqingcai.com/ response <Response [200]>
. . .
Cost time: 53.78607153892517
还是不行,它还不是异步执行,也就是说我们仅仅将涉及 IO
操作的代码封装到 async
修饰的方法里面是不可行的!我们必须要使用支持异步操作的请求方式
才可以实现真正的异步,所以这里就需要 aiohttp
派上用场了。
aiohttp
是一个支持异步请求的库,利用它和 asyncio
配合我们可以非常方便地实现异步请求操作。
安装方式:pip3 install aiohttp
官方文档链接为:https://aiohttp.readthedocs.io/,它分为两部分,一部分是 Client
,一部分是 Server
,详细的内容可以参考官方文档。
下面我们将 aiohttp
用上来,将代码改成如下样子:
import asyncio
import aiohttp
import time
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
await response.text()
await session.close()
return response
async def request():
url = ‘https://static4.scrape.cuiqingcai.com/‘
print(‘Waiting for‘, url)
response = await get(url)
print(‘Get response from‘, url, ‘response‘, response)
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(‘Cost time:‘, end - start)
在这里我们将请求库由 requests
改成了 aiohttp
,通过 aiohttp
的 ClientSession
类的 get
方法进行请求,结果如下:
Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Waiting for https://static4.scrape.cuiqingcai.com/
. . .
Get response from https://static4.scrape.cuiqingcai.com/ response <ClientResponse(https://static4.scrape.cuiqingcai.com/) [200 OK]>
<CIMultiDictProxy(‘Server‘: ‘nginx/1.17.8‘, ‘Date‘: ‘Wed, 14 Oct 2020 15:52:30 GMT‘, ‘Content-Type‘: ‘text/html; charset=utf-8‘, ‘Transfer-Encoding‘: ‘chunked‘, ‘Connection‘: ‘keep-alive‘, ‘Vary‘: ‘Accept-Encoding‘, ‘X-Frame-Options‘: ‘DENY‘, ‘X-Content-Type-Options‘: ‘nosniff‘, ‘Referrer-Policy‘: ‘same-origin‘, ‘Strict-Transport-Security‘: ‘max-age=15724800; includeSubDomains‘, ‘Content-Encoding‘: ‘gzip‘)>
. . .
Get response from https://static4.scrape.cuiqingcai.com/ response <ClientResponse(https://static4.scrape.cuiqingcai.com/) [200 OK]>
<CIMultiDictProxy(‘Server‘: ‘nginx/1.17.8‘, ‘Date‘: ‘Wed, 14 Oct 2020 15:52:31 GMT‘, ‘Content-Type‘: ‘text/html; charset=utf-8‘, ‘Transfer-Encoding‘: ‘chunked‘, ‘Connection‘: ‘keep-alive‘, ‘Vary‘: ‘Accept-Encoding‘, ‘X-Frame-Options‘: ‘DENY‘, ‘X-Content-Type-Options‘: ‘nosniff‘, ‘Referrer-Policy‘: ‘same-origin‘, ‘Strict-Transport-Security‘: ‘max-age=15724800; includeSubDomains‘, ‘Content-Encoding‘: ‘gzip‘)>
Cost time: 5.596387624740601
成功了!我们发现这次请求的耗时由 50 秒变直接成了 5 秒多,耗费时间减少了非常非常多。
代码里面我们使用了 await
,后面跟了 get
方法,在执行这 10 个协程的时候,如果遇到了 await
,那么就会将当前协程挂起,转而去执行其他的协程,直到其他的协程也挂起或执行完毕,再进行下一个协程的执行。
开始运行时,时间循环会运行第一个 task
,针对第一个 task
来说,当执行到第一个 await
跟着的 get
方法时,它被挂起,但这个 get
方法第一步的执行是非阻塞的,挂起之后立马被唤醒,所以立即又进入执行,创建了 ClientSession
对象,接着遇到了第二个 await
,调用了 session.get
请求方法,然后就被挂起了,由于请求需要耗时很久,所以一直没有被唤醒。
当第一个 task
被挂起了,那接下来该怎么办呢?事件循环会寻找当前未被挂起
的协程继续执行,于是就转而执行第二个 task
了,也是一样的流程操作,直到执行了第十个 task
的 session.get
方法之后,全部的 task
都被挂起了。所有 task
都已经处于挂起状态,怎么办?只好等待了。5 秒之后,几个请求几乎同时都有了响应,然后几个 task
也被唤醒接着执行,输出请求结果,最后总耗时,5.6 秒!
怎么样?这就是异步操作的便捷之处,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等待,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。
你可能会说,既然这样的话,在上面的例子中,在发出网络请求后,既然接下来的 5 秒都是在等待的,在 5 秒之内,CPU 可以处理的 task
数量远不止这些,那么岂不是我们放 10 个、20 个、50 个、100 个、1000 个 task
一起执行,最后得到所有结果的耗时不都是差不多的吗?因为这几个任务被挂起后都是一起等待的。
理论来说确实是这样的,不过有个前提,那就是服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压,另外还要忽略 IO 传输时延,确实可以做到无限 task
一起执行且在预想时间内得到结果。但由于不同服务器处理的实现机制不同,可能某些服务器并不能承受这么高的并发,因此响应速度也会减慢。
在这里我们以百度为例,来测试下并发数量为 1、3、5、10、...、500 的情况下的耗时情况,代码如下:
import asyncio
import aiohttp
import time
def test(num):
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
await response.text()
await session.close()
return response
async def request():
url = ‘https://www.baidu.com/‘
await get(url)
tasks = [asyncio.ensure_future(request()) for _ in range(num)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print(‘Number:‘, num, ‘Cost time:‘, end - start)
for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:
test(number)
运行结果如下:
Number: 1 Cost time: 0.05885505676269531
Number: 3 Cost time: 0.05773782730102539
Number: 5 Cost time: 0.05768704414367676
Number: 10 Cost time: 0.15174412727355957
Number: 15 Cost time: 0.09603095054626465
Number: 30 Cost time: 0.17843103408813477
Number: 50 Cost time: 0.3741800785064697
Number: 75 Cost time: 0.2894289493560791
Number: 100 Cost time: 0.6185381412506104
Number: 200 Cost time: 1.0894129276275635
Number: 500 Cost time: 1.8213098049163818
可以看到,即使我们增加了并发数量,但在服务器能承受高并发的前提下,其爬取速度几乎不太受影响。
综上所述,使用了异步请求之后,我们几乎可以在相同的时间内实现成百上千倍次的网络请求,把这个运用在爬虫中,速度提升是非常可观的。
以上便是 Python 中协程的基本原理和用法。
原文:https://www.cnblogs.com/shisuizhe/p/13818204.html