python 基于aiohttp的異步爬蟲實戰(zhàn)

鋼鐵知識庫,一個學(xué)習(xí)python爬蟲、數(shù)據(jù)分析的知識庫。人生苦短,快用python。

在于都等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站建設(shè)、做網(wǎng)站 網(wǎng)站設(shè)計制作按需定制,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站制作,營銷型網(wǎng)站建設(shè),外貿(mào)網(wǎng)站制作,于都網(wǎng)站建設(shè)費用合理。

之前我們使用requests庫爬取某個站點的時候,每發(fā)出一個請求,程序必須等待網(wǎng)站返回響應(yīng)才能接著運行,而在整個爬蟲過程中,整個爬蟲程序是一直在等待的,實際上沒有做任何事情。

像這種占用磁盤/內(nèi)存IO、網(wǎng)絡(luò)IO的任務(wù),大部分時間是CPU在等待的操作,就叫IO密集型任務(wù)。對于這種情況有沒有優(yōu)化方案呢,當然有,那就是使用aiohttp庫實現(xiàn)異步爬蟲。

aiohttp是什么

我們在使用requests請求時,只能等一個請求先出去再回來,才會發(fā)送下一個請求。明顯效率不高阿,這時候如果換成異步請求的方式,就不會有這個等待。一個請求發(fā)出去,不管這個請求什么時間響應(yīng),程序通過await掛起協(xié)程對象后直接進行下一個請求。

解決方法就是通過 aiohttp + asyncio,什么是aiohttp?一個基于 asyncio 的異步 HTTP 網(wǎng)絡(luò)模塊,可用于實現(xiàn)異步爬蟲,速度明顯快于 requests 的同步爬蟲。

requests和aiohttp區(qū)別

區(qū)別就是一個同步一個是異步。話不多說直接上代碼看效果。

安裝aiohttp

pip install aiohttp
  • requests同步示例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# author: 鋼鐵知識庫
import time
import requests

# 同步請求
def main():
    start = time.time()
    for i in range(5):
        res = requests.get('http://httpbin.org/delay/2')
        print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status_code}')
    print(f'requests同步耗時:{time.time() - start}')
    
if __name__ == '__main__':
    main()

'''
當前時間:2022-09-05 15:44:51., status_code = 200
當前時間:2022-09-05 15:44:54., status_code = 200
當前時間:2022-09-05 15:44:57.0, status_code = 200
當前時間:2022-09-05 15:44:59., status_code = 200
當前時間:2022-09-05 15:45:02., status_code = 200
requests同步耗時:12.
'''

可以看到5次請求總共用12.7秒,再來看同樣的請求異步多少時間。

  • aiohttp異步示例:
#!/usr/bin/env python
# file: day6-9同步和異步.py
# author: 鋼鐵知識庫
import asyncio
import time
import aiohttp

async def async_http():
    # 聲明一個支持異步的上下文管理器
    async with aiohttp.ClientSession() as session:
        res = await session.get('http://httpbin.org/delay/2')
        print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status}')

tasks = [async_http() for _ in range(5)]
start = time.time()
# Python 3.7 及以后,不需要顯式聲明事件循環(huán),可以使用 asyncio.run()來代替最后的啟動操作
asyncio.run(asyncio.wait(tasks))
print(f'aiohttp異步耗時:{time.time() - start}')

'''
當前時間:2022-09-05 15:42:32., status_code = 200
當前時間:2022-09-05 15:42:32., status_code = 200
當前時間:2022-09-05 15:42:32., status_code = 200
當前時間:2022-09-05 15:42:32., status_code = 200
當前時間:2022-09-05 15:42:32., status_code = 200
aiohttp異步耗時:2.
'''

兩次對比可以看到執(zhí)行過程,時間一個是順序執(zhí)行,一個是同時執(zhí)行。這就是同步和異步的區(qū)別。

aiohttp使用介紹

接下來我們會詳細介紹aiohttp庫的用法和爬取實戰(zhàn)。aiohttp 是一個支持異步請求的庫,它和 asyncio 配合使用,可以使我們非常方便地實現(xiàn)異步請求操作。asyncio模塊,其內(nèi)部實現(xiàn)了對TCP、UDP、SSL協(xié)議的異步操作,但是對于HTTP請求,就需要aiohttp實現(xiàn)了。

aiohttp分為兩部分,一部分是Client,一部分是Server。下面來說說aiohttp客戶端部分的用法。

基本實例

先寫一個簡單的案例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 鋼鐵知識庫
import asyncio
import aiohttp

async def get_api(session, url):
    # 聲明一個支持異步的上下文管理器
    async with session.get(url) as response:
        return await response.text(), response.status

async def main():
    async with aiohttp.ClientSession() as session:
        html, status = await get_api(session, 'http://httpbin.org/delay/2')
        print(f'html: {html[:50]}')
        print(f'status : {status}')

if __name__ == '__main__':
    #  Python 3.7 及以后,不需要顯式聲明事件循環(huán),可以使用 asyncio.run(main())來代替最后的啟動操作
    asyncio.get_event_loop().run_until_complete(main())
'''
html: {
  "args": {}, 
  "data": "", 
  "files": {}, 
  
status : 200

Process finished with exit code 0
'''

aiohttp請求的方法和之前有明顯區(qū)別,主要包括如下幾點:

  1. 除了導(dǎo)入aiohttp庫,還必須引入asyncio庫,因為要實現(xiàn)異步,需要啟動協(xié)程。
  2. 異步的方法定義不同,前面都要統(tǒng)一加async來修飾。
  3. with as用于聲明上下文管理器,幫我們自動分配和釋放資源,加上async代碼支持異步。
  4. 對于返回協(xié)程對象的操作,前面需要加await來修飾。response.text()返回的是協(xié)程對象。
  5. 最后運行啟用循環(huán)事件

注意:Python3.7及以后的版本中,可以使用asyncio.run(main())代替最后的啟動操作。

URL參數(shù)設(shè)置

對于URL參數(shù)的設(shè)置,我們可以借助params設(shè)置,傳入一個字典即可,實例如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 鋼鐵知識庫
import aiohttp
import asyncio

async def main():
    params = {'name': '鋼鐵知識庫', 'age': 23}
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.httpbin.org/get', params=params) as res:
            print(await res.json())

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
{'args': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-e34-1acf7bde7a6dc72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=鋼鐵知識庫&age=23'}
'''

可以看到實際請求的URL后面帶了后綴,這就是params的內(nèi)容。

請求類型

除了get請求,aiohttp還支持其它請求類型,如POST、PUT、DELETE等,和requests使用方式類似。

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

要使用這些方法,只需要把對應(yīng)的方法和參數(shù)替換一下。用法和get類似就不再舉例。

響應(yīng)的幾個方法

對于響應(yīng)來說,我們可以用如下方法分別獲取其中的響應(yīng)情況。狀態(tài)碼、響應(yīng)頭、響應(yīng)體、響應(yīng)體二進制內(nèi)容、響應(yīng)體JSON結(jié)果,實例如下:

#!/usr/bin/env python
# @Author  : 鋼鐵知識庫
import aiohttp
import asyncio

async def main():
    data = {'name': '鋼鐵知識庫', 'age': 23}
    async with aiohttp.ClientSession() as session:
        async with session.post('https://www.httpbin.org/post', data=data) as response:
            print('status:', response.status)  # 狀態(tài)碼
            print('headers:', response.headers)  # 響應(yīng)頭
            print('body:', await response.text())  # 響應(yīng)體
            print('bytes:', await response.read())  # 響應(yīng)體二進制內(nèi)容
            print('json:', await response.json())  # 響應(yīng)體json數(shù)據(jù)

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
status: 200
headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {
  "args": {}, 
  "data": "", 
  "files": {}, 
  "form": {
    "age": "23", 
    "name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"
  }, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Content-Length": "57", 
    "Content-Type": "application/x-www-form-urlencoded", 
    "Host": "www.httpbin.org", 
    "User-Agent": "Python/3.8 aiohttp/3.8.1", 
    "X-Amzn-Trace-Id": "Root=1-dc-6aa1b2ba1a0481d06e1"
  }, 
  "json": null, 
  "origin": "122.55.11.188", 
  "url": "https://www.httpbin.org/post"
}

bytes: b'{\n  "args": {}, \n  "data": "", \n  "files": {}, \n  "form": {\n    "age": "23", \n    "name": "\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip, deflate", \n    "Content-Length": "57", \n    "Content-Type": "application/x-www-form-urlencoded", \n    "Host": "www.httpbin.org", \n    "User-Agent": "Python/3.8 aiohttp/3.8.1", \n    "X-Amzn-Trace-Id": "Root=1-dc-6aa1b2ba1a0481d06e1"\n  }, \n  "json": null, \n  "origin": "122.5.132.196", \n  "url": "https://www.httpbin.org/post"\n}\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-dc-6aa1b2ba1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'}
'''

可以看到有些字段前面需要加await,因為其返回的是一個協(xié)程對象(如async修飾的方法),那么前面就要加await。

超時設(shè)置

我們可以借助ClientTimeout對象設(shè)置超時,例如要設(shè)置1秒的超時時間,可以這么實現(xiàn):

#!/usr/bin/env python
# @Author  : 鋼鐵知識庫
import aiohttp
import asyncio

async def main():
    # 設(shè)置 1 秒的超時 
    timeout = aiohttp.ClientTimeout(total=1)
    data = {'name': '鋼鐵知識庫', 'age': 23}
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get('https://www.httpbin.org/delay/2', data=data) as response:
            print('status:', response.status)  # 狀態(tài)碼

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
Traceback (most recent call last):
####中間省略####
    raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
'''

這里設(shè)置了超時1秒請求延時2秒,發(fā)現(xiàn)拋出異常asyncio.TimeoutError,如果正常則響應(yīng)200。

并發(fā)限制

aiohttp可以支持非常高的并發(fā)量,但面對高并發(fā)網(wǎng)站可能會承受不住,隨時有掛掉的危險,這時需要對并發(fā)進行一些控制。現(xiàn)在我們借助asyncio 的Semaphore來控制并發(fā)量,實例如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 鋼鐵知識庫
import asyncio
from datetime import datetime
import aiohttp

# 聲明最大并發(fā)量
semaphore = asyncio.Semaphore(2)

async def get_api():
    async with semaphore:
        print(f'scrapting...{datetime.now()}')
        async with session.get('https://www.baidu.com') as response:
            await asyncio.sleep(2)
            # print(f'當前時間:{datetime.now()}, {response.status}')

async def main():
    global session
    session = aiohttp.ClientSession()
    tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]
    await asyncio.gather(*tasks)
    await session.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
scrapting...2022-09-07 08:11:14.
scrapting...2022-09-07 08:11:14.
scrapting...2022-09-07 08:11:16.
scrapting...2022-09-07 08:11:16.
scrapting...2022-09-07 08:11:18.
scrapting...2022-09-07 08:11:18.
'''

在main方法里,我們聲明了1000個task,如果沒有通過Semaphore進行并發(fā)限制,那這1000放到gather方法后會被同時執(zhí)行,并發(fā)量相當大。有了信號量的控制之后,同時運行的task數(shù)量就會被控制,這樣就能給aiohttp限制速度了。

aiohttp異步爬取實戰(zhàn)

接下來我們通過異步方式練手一個小說爬蟲,需求如下:

需求頁面:https://dushu.baidu.com/pc/detail?gid=

目錄接口:https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":""}

詳情接口:https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"","cid":"|"}

關(guān)鍵參數(shù):book_id:小說ID、cid:章節(jié)id

采集要求:使用協(xié)程方式寫入,數(shù)據(jù)存放進mongo

需求分析:點開需求頁面,通過F12抓包可以發(fā)現(xiàn)兩個接口。一個目錄接口,一個詳情接口。
首先第一步先請求目錄接口拿到cid章節(jié)id,然后將cid傳遞給詳情接口拿到小說數(shù)據(jù),最后存入mongo即可。

話不多說,直接上代碼:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 鋼鐵知識庫
# 不合適就是不合適,真正合適的,你不會有半點猶豫。
import asyncio
import json,re
import logging
import aiohttp
import requests
from utils.conn_db import ConnDb

# 日志格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

# 章節(jié)目錄api
b_id = ''
url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}'
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                  "Chrome/104.0.0.0 Safari/537.36"
}
# 并發(fā)聲明
semaphore = asyncio.Semaphore(5)

async def download(title,b_id, cid):
    data = {
        "book_id": b_id,
        "cid": f'{b_id}|{cid}',
    }
    data = json.dumps(data)
    detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data)
    async with semaphore:
        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.get(detail_url) as response:
                res = await response.json()
                content = {
                    'title': title,
                    'content': res['data']['novel']['content']
                }
                # print(title)
                await save_data(content)

async def save_data(data):
    if data:
        client = ConnDb().conn_motor_mongo()
        db = client.baidu_novel
        collection = db.novel
        logging.info('saving data %s', data)
        await collection.update_one(
            {'title': data.get('title')},
            {'$set': data},
            upsert=True
        )

async def main():
    res = requests.get(url, headers=headers)
    tasks = []
    for re in res.json()['data']['novel']['items']:     # 拿到某小說目錄cid
        title = re['title']
        cid = re['cid']
        tasks.append(download(title, b_id, cid))    # 將請求放到列表里,再通過gather執(zhí)行并發(fā)
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main())

至此,我們就使用aiohttp完成了對小說章節(jié)的爬取。

要實現(xiàn)異步處理,得先要有掛起操作,當一個任務(wù)需要等待 IO 結(jié)果的時候,可以掛起當前任務(wù),轉(zhuǎn)而去執(zhí)行其他任務(wù),這樣才能充分利用好資源,要實現(xiàn)異步,需要了解 await 的用法,使用 await 可以將耗時等待的操作掛起,讓出控制權(quán)。當協(xié)程執(zhí)行的時候遇到 await,時間循環(huán)就會將本協(xié)程掛起,轉(zhuǎn)而去執(zhí)行別的協(xié)程,直到其他的協(xié)程掛起或執(zhí)行完畢。

await 后面的對象必須是如下格式之一:

  • A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine,一個由 types.coroutine 修飾的生成器,這個生成器可以返回 coroutine 對象。
  • An object with an await method returning an iterator,一個包含 await 方法的對象返回的一個迭代器。

---- 鋼鐵知識庫

總結(jié)

以上就是借助協(xié)程async和異步aiohttp兩個主要模塊完成異步爬蟲的內(nèi)容,
aiohttp 以異步方式爬取網(wǎng)站的耗時遠小于 requests 同步方式,以上列舉的例子希望對你有幫助。

注意,線程和協(xié)程是兩個概念,后面找機會我們再聊聊進程和線程、線程和協(xié)程的關(guān)系。

本文標題:python 基于aiohttp的異步爬蟲實戰(zhàn)
本文網(wǎng)址:http://m.kartarina.com/article18/dsogedp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信小程序網(wǎng)站收錄網(wǎng)站設(shè)計公司網(wǎng)站策劃網(wǎng)站營銷Google

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

手機網(wǎng)站建設(shè)
主站蜘蛛池模板: 无码喷水一区二区浪潮AV| 亚洲日韩精品无码专区网站| 乱人伦人妻中文字幕无码久久网| 亚洲av永久中文无码精品| 国产成人无码AV麻豆| 精品无码久久久久久国产| 亚洲一区无码中文字幕| 成年男人裸j照无遮挡无码| 国产成人无码精品久久久免费| 久久久久久AV无码免费网站| 特级无码a级毛片特黄| 免费A级毛片无码视频| 国产日产欧洲无码视频无遮挡| 久久午夜夜伦鲁鲁片免费无码影视| 爽到高潮无码视频在线观看| 亚洲精品无码你懂的| 东京热人妻无码人av| 亚洲精品无码你懂的网站| 亚洲AV日韩AV无码污污网站| 亚洲AV人无码激艳猛片| 国产日产欧洲无码视频无遮挡 | 亚洲AV无码一区二区三区久久精品 | 国产精品ⅴ无码大片在线看| a级毛片无码免费真人| 无码福利一区二区三区| 亚洲中文久久精品无码| 国产aⅴ激情无码久久| 波多野42部无码喷潮在线| 久久久久亚洲AV无码去区首| 亚洲日韩精品无码AV海量| 日韩人妻无码一区二区三区久久| 中文字幕无码一区二区免费| 久久伊人中文无码| 潮喷大喷水系列无码久久精品| 国产精品无码一区二区三区在| 亚洲AV无码成人网站在线观看 | 一本色道无码不卡在线观看| 亚洲高清无码在线观看| 国产成人亚洲综合无码| 国产精品一级毛片无码视频| 亚洲无码日韩精品第一页|