自学内容网 自学内容网

异步爬虫基础

我们知道爬虫是 IO 密集型任务, 例如使用 requests 库来爬取某个站点,当发出一个请求后, 程序必须等待网站返回响应, 才能接着运行, 而在等待过程中,整个爬虫程序是一直在等待的。

协程的基本原理

案例的引入

在介绍案例之前, 先看一个网站, 地址为:

https://www.httpbin.org/delay/5

访问这个链接,需要先等待 5 秒才能得到结果, 这是因为服务器强制等待了 5 秒时间才返回响应

平时我们浏览网站,一般响应速度都是很快的,如果用爬虫,等待的时间也不长,然而像这个网站需要 5 秒以上才能拿到结果,如果我们直接遍历100 次案例网站,会是什么效果

import requests
import logging
import time

logging.basicConfig(level=logging.INFO, format='%(asctiem)s- %(levelname)s: %(message)s')
TOTAL_NUMBER = 100
URL = 'https://httpbin.org/delay/5'

start_time = time.time()
for _ in range(1, TOTAL_NUMBER +1 ):
    logging.info('scraping %s', URL)
    respons = requests.get(URL)
end_time = tiem.time()
logging.info('total tiem %s seconds', end_time -start_time)

这里我们直接用循环的方式构造了100 个请求, 使用的是 requests 单线程, 在爬取之前和爬取之后分别记录了时间, 最后输出了爬取 100 个页面消耗的总时间

由于每个页面都至少要等待5秒的时间,因此 100 个页面至少要消耗 500 秒时间,加上网站本身的负载问题, 总的爬取时间最终为663,大约 11 分钟

这在实际情况中是很常见的,这些网站的本身加载速度就比较慢, 稍慢的可能 1-3秒, 更慢的说不定 10 秒以上。 如果我们用 requests 单线程这么爬取, 总耗时会非常的大。 此时要是打开多线程或多进程来爬取, 其爬取速度会成倍的提升,是否有更好的方法? 答案是协程

基础知识

阻塞

阻塞状态是指程序未得到所需计算资源时被挂起的状态。 程序在等待某个操作完成期间, 自身无法继续干别的事情, 则该程序在操作上是阻塞的

常见的阻塞有:网络I/O阻塞, 用户输入阻塞等。包括在CPU切换上下文时,所有进程都无法真正干事情,它们也会被阻塞。在多核CPU的情况下, 正在执行上下文切换操作的核不可被利用

非阻塞

程序在等待其操作的过程中, 自身不被阻塞,可以继续干别的事情, 则称该程序在操作上是非阻塞的。

非阻塞并不是在任何程序级别,任何情况都存在的。 仅当程序封装的级别可以囊括独立的子程序单元时, 程序才可能存在非阻塞状态

非阻塞因阻塞的存在而存在,正因为阻塞导致程序运行的耗时增加与效率低下,我们才要把它变成非阻塞

同步

不同程序单元为了共同完成某个任务,在执行过程中需要靠某中通信方式保持协调一致,此时这些程序单元是同步执行的。

例如在购物系统中更新商品库存时,需要用 “行锁” 作为通信信号,强制让不同的更新请求排队,并按顺序执行, 这里的更新库存操作就是同步的

简而言之,同步意味着有序

异步

为了完成某个任务,有时不同程序单元之间无须通信协调也能完成任务,此时不相关的程序单元之间可以时异步的

例如,爬取下载网页。 调度程序调用下载程序后,即可调度其他任务, 无需与该下载任务保持通信以协调行为。不同网页的下载,保存等操作都是无关的,也无需相互通知 协调。这些异步操作完成时刻并不确定

异步意味着无序

多进程

多进程就是利用CPU多核优势,在同一时间并行执行多个任务,可以大大提高执行效率

协程

协程, 英文叫作 coroutine, 又称微线程,纤程,是一种运行在用户态的轻量级线程

协程拥有自己的寄存器上下文和栈。协程在调度切换时,将寄存器上下文和栈保存到其他地方,等切回来的时候,再恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入,就相当于进入上一次调用的状态。

协程本质上是单个进程,相对于多进程来说,它没有线程上下文切换的开销,没有原子锁定及同步的开销,编程模型也非常简单

我们可以使用协程来实现异步操作,例如在网络爬虫的场景下,我们发出一个请求之后,需要等待一定时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他事情,等到响应之后再切回来继续处理,这样可以充分利用CPU和其他资源,这就是协程的优势

协程的用法

python 中使用协程最常用的库莫过于 asyncio 

首先了解下面几个概念

event_loop: 时间循环, 相当与一个无限循环,我们可以把一些函数注册到这个时间上,当满足发生条件的时候,就调用对应的处理方法

coroutine: 中文翻译叫做协程,在 python 中指代协程对象类型,我们可以将协程对象注册到事件循环中,它会被事件循环调用。我们可以使用 async 关键子定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象

task :任务,这是对协程对象的进一步封装,包含协程对象的各个状态。

future: 代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别

另外,我们还需要了解 async , await 关键字, 专门用于定义协程。前者用来定义一个协程,后者用来挂起阻塞方法的执行

定义协程

import asyncio
import nest_asyncio
nest_asyncio.apply()
async def execute(x):
    print('Number:',x)
coroutine = execute(1)
print('Coroutien:', coroutine)
print('After calling execure')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

Coroutien: <coroutine object execute at 0x00000199022ACC70>
After calling execure
Number: 1
After calling loop

 

首先我们引入了 asyncio 包,这样才可以使用 async 和 await 关键字,然后使用了 async 定义了一个 execute 方法,该方法接受一个数字参数 x , 执行之后会打印这个数字

随后我们直接调用了 execute 方法,然而这个方法并没有执行,而是返回了一个 coroutine 协程对象,之后我们使用了get_event_loop 方法创建了一个事件循环 loop, 并调用了 loop 对象的 run_until_complete 方法将协程对象注册到了事件循环中,接着启动,最后我们才看到 execute 方法打印出了接收的数字。

可见, async 定义的方法会变成一个无法直接执行的协程对象,必须将此对象注册到事件循环中才可以执行

import nest_asyncio
nest_asyncio.apply()

这两行代码是因为在执行中会报错

这个报错,并不是一定有,暂时不知道原理

This event loop is already running

而加上去的

前面我们还提到了 task , 它是对协程对象的进一步封装,比协程对象多了运行状态,例如 running, finished等,我们可以利用这些状态获取协程对象的执行情况

在上面的例子中,当我们把协程对象 coroutine 传递给 run_until_complete 方法的时候,实际上它进行了一个操作,就是将 coroutine 封装成 task 对象,对此,我们也可以显示的进行声明

import asyncio

async def execute(x):
    print('Number:', x)
    return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

Coroutine: <coroutine object execute at 0x0000019906912190>
After calling execute
Task: <Task pending name='Task-7' coro=<execute() running at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\1498524175.py:3>>
Number: 1
Task: <Task finished name='Task-7' coro=<execute() done, defined at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\1498524175.py:3> result=1>
After calling loop

这里我们定义了 loop 对象之后,紧接着调用了它的 create_task 方法,将协程对象转化为 task 对象,随后打印输出了一下,发现它处于 pending 状态。 然后将 task 对象添加到时间循环中执行,并再次打印出 task 对象, 发现它的状态变成了 finished , 同事还可以看到 result 变成了 1 ,也就是我们定的 execute 方法返回的结果

定义 task 方法还有另外一种方式,就是直接调用 asyncio 包的 ensure_future 方法,返回结果也是task 对象,这样的话我们就可以不借助 loop 对象,即使还没有声明 loop,也可以提前定义好 task 对象

import asyncio

async def execute(x):
    print('Number:', x)
    return x

coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

task = asyncio.ensure_future(coroutine)
print('Task:',task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

Coroutine: <coroutine object execute at 0x0000019906E38790>
After calling execute
Task: <Task pending name='Task-9' coro=<execute() running at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\1055771119.py:3>>
Number: 1
Task: <Task finished name='Task-9' coro=<execute() done, defined at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\1055771119.py:3> result=1>
After calling loop

效果一样的

绑定回调

我们也可以为 task 对象绑定一个回调方法

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

def callback(task):
    print('Status:', task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

Task: <Task pending name='Task-12' coro=<request() running at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\1121094303.py:4> cb=[callback() at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\1121094303.py:9]>
Task: <Task finished name='Task-12' coro=<request() done, defined at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\1121094303.py:4> result=<Response [200]>>
Status: <Response [200]>

这里我们定义了 request 方法, 在这个方法里请求了百度, 并获取了其状态码, 但是没有编写任何 print  语句, 随后我们定义 callback 方法, 这个方法接收了一个参数, 参数是 task 对象, 在这个方法里调用了 print 方法 打印出了 task 对象的结果。 这样就定义好了一个协程方法和一个回调方法。我们现在希望达到的效果是,当协程对象执行完毕后,就去执行声明的 callback 方法

那么两者怎样关联起来呢?很简单只要调用 add_done_callback 方法就行,我们将 callback 方法传递给封装好的 task 对象,这样当 task 执行完毕后, 就可以调用 callback 方法了。 同时 task 对象还会作为参数传递给 callback 方法,调用 task 对象的 result 方法就可以获取返回结果了

实际上,即使不使用回调方法, 在 task 运行完毕后,也可以直接调用 result 方法获取结果

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())

Task: <Task pending name='Task-15' coro=<request() running at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\348552350.py:4>>
Task: <Task finished name='Task-15' coro=<request() done, defined at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\348552350.py:4> result=<Response [200]>>
Task Result: <Response [200]>

多任务协程

在上面的例子中,我们都只执行了一次请求,如果要执行多次请求该怎么做?

可以定义一个 task 列表, 然后使用 asyncio 包中的 wait 方法执行

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())

Task: <Task finished name='Task-15' coro=<request() done, defined at C:\Users\86151\AppData\Local\Temp\ipykernel_8608\348552350.py:4> result=<Response [200]>>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>

这里我们使用了一个 for 循环创建了 5 个 task 它们组成了一个列表,然后首先将这个列表传给 asyncio 包的 wait 方法, 再将其注册到事件循环中,就可以发起 5 个任务了, 最后输出任务的执行结果,就可以看到 5 个任务被以此执行

协程实现

了解前面的基础之后,现在来用最开始的例子,使用协程来完成它

首先看一个使用协程常犯的错误

import asyncio
import requests
import time

start = time.time()

async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from : ', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Waiting for https://www.httpbin.org/delay/5
Get response from :  https://www.httpbin.org/delay/5 response <Response [200]>
Cost time: 66.84309554100037

 

这里我们创建了 10 个task ,然后将 tasks 列表传递给了 wait 方法并注册到事件循环中执行

可以发现,这和正常的请求没有什么区别,各个任务依然是顺序执行的,耗时 66 秒,平均一个请求耗时 6.6 秒。并没有实现异步

其实,想要实现异步操作,先得有挂起操作,当一个任务需要等待 I/O结果的时候,可以挂起当前的任务,转而执行其他任务,这样才能充分利用好资源。

要实现异步,我们再了解一下 await 关键字的用法,它可以将耗时等待的操作挂起,让出控制权。

如果协程再执行的时候遇到 await , 事件循环就会将本协程挂起,转而执行别的协程,直到其他协程挂起或执行完毕

 async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for', url)
    response = await requests.get(url)
    print('Get response from : ', url, 'response', response)

这里按照教程里是要报错的,但我这里并没有报错,

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

加上这些代码后并不能完整执行,说明还是有问题的

来看看教程说的问题

这次协程遇到 await 确实挂起了,也等待了,但是最后却报错了。这个错误的原因是 requests 返回的 response 对象不能和 awiat 一起使用,因为官方文档说明 await 后面的对象必须是如下格式之一

一个原生协程对象

一个由 type.coroutine 修饰的生成器,这个生成器可以返回协程对象

由一个包含 __await__ 方法的对象返回的一个迭代器

这里返回的 Response 对像以上三种都不符合,因此报错

如果我们直接将 async 后面的请求方法变成协程对象会怎样

import asyncio
import requests
import time

start = time.time()

async def get(url):
    return requests.get(url)

async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asynico.wait(tasks))

end = time.time()
print('Cost time:', end - start)

答案是报错了,这里报错和教程不一样,不过懒得纠结

使用 aiohttp

aiohttp 是一个支持异步请求的库, 它和 asyncio 配合使用,可以使我们非常方便的实现异步请求操作

安装:  pip install aiohttp

官方链接: https://aiohttp.readthedocs.io/

import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    await session.close()
    return response

async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from',url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:', end - start)

Waiting for https://www.httpbin.org/delay/5
Waiting for https://www.httpbin.org/delay/5
Waiting for https://www.httpbin.org/delay/5
Waiting for https://www.httpbin.org/delay/5

..........

Cost time: 6.876450300216675

这次只有 6秒

这里我们使用了 await , 其后面跟着 get 方法, 在执行 10个协程的时候, 如果遇到 await , 就会将当前协程挂起,转而执行其他协程,直到其他协程也挂起或执行完毕,再执行下一个协程

开始运行时,事件循环会运行第一个 task 对于第一个 task 来说, 当执行到第一个 await 跟着的 get 方法时,他会被挂起,但这个 get 方法第一步的执行是非常阻塞的, 挂起之后会立马被唤醒,立即又进入执行,并创建了 ClientSession 对象。 接着遇到第二个 await , 调用 session.get 请求方法,然后就被挂起了。由于请求需要耗时很久,所以一直没有被唤醒,好在第一个 task 被挂起了,接下来,事件循环会寻找当前未被挂起的协程继续执行,然后转而去执行第二个 task ,流程操作和第一个 task 也是一样的,以此类推,知道第十个 task 的 session.get 方法之后,全部的 task 都被挂起了。 所有 task 都已经处于挂起状态,这时就只好等待了,5秒之后,几个请求几乎同时响应,然后几个 task 也被唤醒接着执行,并输出请求结果

在上面的例子中,发出网络请求后,接下来的 5 秒都是在等待,那么在这 5 秒之内, CPU 可以处理的 task 数量远远不止这些,既然这样的话,我们可以放的 task 能达到多少?

这就要取决于服务器的响应,IO传输的延迟,不同的服务器 处理 task 的实现机制不同

这里以百度为例

import asyncio
import aiohttp
import time 
def test(number):
    start = time.time()
    async def get(url):
        session = aiohttp.ClientSession()
        response = await session.get(url)
        await response.text()
        await session.close()
        return response

    async def request():
        url = 'https://www.baidu.com/'
        await get(url)
    
    tasks = [asyncio.ensure_future(request()) for _ in range(number)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    
    end = time.time()
    print('Number:', number, 'Cost time:', end - start)
    
for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:
    test(number)

注意请求太多会被限制

在服务器能够承受高并发的情况下,即使我们增加了并发量,爬取速速也不会太受影响

 


原文地址:https://blog.csdn.net/qq_39217312/article/details/140686581

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!