python異步函數(shù) python調(diào)用異步函數(shù)

Python異步編程4:協(xié)程函數(shù),協(xié)程對(duì)象,await關(guān)鍵字

協(xié)程函數(shù):async def?函數(shù)名。3.5+

創(chuàng)新互聯(lián)建站是一家專(zhuān)業(yè)提供新密企業(yè)網(wǎng)站建設(shè),專(zhuān)注與成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、H5響應(yīng)式網(wǎng)站、小程序制作等業(yè)務(wù)。10年已為新密眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專(zhuān)業(yè)網(wǎng)站制作公司優(yōu)惠進(jìn)行中。

協(xié)程對(duì)象:執(zhí)行協(xié)程函數(shù)()得到的協(xié)程對(duì)象。

3.5之后的寫(xiě)法:

3.7之后的寫(xiě)法:更簡(jiǎn)便

await后面?跟?可等待的對(duì)象。(協(xié)程對(duì)象,F(xiàn)uture,Task對(duì)象?約等于IO等待)

await實(shí)例2:串行執(zhí)行。 一個(gè)協(xié)程函數(shù)里面可以支持多個(gè)await ,雖然會(huì)串行,但是如果有其他協(xié)程函數(shù),任務(wù)列表也在執(zhí)行,依然會(huì)切換。只是案例中的main對(duì)應(yīng)執(zhí)行的others1和others2串行 。 await會(huì)等待對(duì)象的值得到之后才繼續(xù)往下走。

Ppython await是什么?

await的解釋?zhuān)?/p>

await用來(lái)聲明程序掛起。

比如異步程序執(zhí)行到某一步時(shí)需要等待的時(shí)間很長(zhǎng),就將此掛起,去執(zhí)行其他的異步程序。

await 后面只能跟異步程序或有__await__屬性的對(duì)象,因?yàn)楫惒匠绦蚺c一般程序不同。

程序解釋?zhuān)?/p>

假設(shè)有兩個(gè)異步函數(shù)async a,async b,a中的某一步有await,

當(dāng)程序碰到關(guān)鍵字await b()后,異步程序掛起后去執(zhí)行另一個(gè)異步b程序,就是從函數(shù)內(nèi)部跳出去執(zhí)行其他函數(shù),

當(dāng)掛起條件消失后,不管b是否執(zhí)行完,要馬上從b程序中跳出來(lái),回到原程序執(zhí)行原來(lái)的操作。

如果await后面跟的b函數(shù)不是異步函數(shù),那么操作就只能等b執(zhí)行完再返回,無(wú)法在b執(zhí)行的過(guò)程中返回。

如果要在b執(zhí)行完才返回,也就不需要用await關(guān)鍵字了,直接調(diào)用b函數(shù)就行。

所以這就需要await后面跟的是異步函數(shù)了。

在一個(gè)異步函數(shù)中,可以不止一次掛起,也就是可以用多個(gè)await。

更多Python知識(shí),請(qǐng)關(guān)注:Python自學(xué)網(wǎng)!!

python異步有哪些方式

yield相當(dāng)于return,他將相應(yīng)的值返回給調(diào)用next()或者send()的調(diào)用者,從而交出了CPU使用權(quán),而當(dāng)調(diào)用者再次調(diào)用next()或者send()的時(shí)候,又會(huì)返回到y(tǒng)ield中斷的地方,如果send有參數(shù),還會(huì)將參數(shù)返回給yield賦值的變量,如果沒(méi)有就和next()一樣賦值為None。但是這里會(huì)遇到一個(gè)問(wèn)題,就是嵌套使用generator時(shí)外層的generator需要寫(xiě)大量代碼,看如下示例:?

注意以下代碼均在Python3.6上運(yùn)行調(diào)試

#!/usr/bin/env python# encoding:utf-8def inner_generator():

i = 0

while True:

i = yield i ? ? ? ?if i 10: ? ? ? ? ? ?raise StopIterationdef outer_generator():

print("do something before yield")

from_inner = 0

from_outer = 1

g = inner_generator()

g.send(None) ? ?while 1: ? ? ? ?try:

from_inner = g.send(from_outer)

from_outer = yield from_inner ? ? ? ?except StopIteration: ? ? ? ? ? ?breakdef main():

g = outer_generator()

g.send(None)

i = 0

while 1: ? ? ? ?try:

i = g.send(i + 1)

print(i) ? ? ? ?except StopIteration: ? ? ? ? ? ?breakif __name__ == '__main__':

main()1234567891011121314151617181920212223242526272829303132333435363738394041

為了簡(jiǎn)化,在Python3.3中引入了yield from

yield from

使用yield from有兩個(gè)好處,

1、可以將main中send的參數(shù)一直返回給最里層的generator,?

2、同時(shí)我們也不需要再使用while循環(huán)和send (), next()來(lái)進(jìn)行迭代。

我們可以將上邊的代碼修改如下:

def inner_generator():

i = 0

while True:

i = yield i ? ? ? ?if i 10: ? ? ? ? ? ?raise StopIterationdef outer_generator():

print("do something before coroutine start") ? ?yield from inner_generator()def main():

g = outer_generator()

g.send(None)

i = 0

while 1: ? ? ? ?try:

i = g.send(i + 1)

print(i) ? ? ? ?except StopIteration: ? ? ? ? ? ?breakif __name__ == '__main__':

main()1234567891011121314151617181920212223242526

執(zhí)行結(jié)果如下:

do something before coroutine start123456789101234567891011

這里inner_generator()中執(zhí)行的代碼片段我們實(shí)際就可以認(rèn)為是協(xié)程,所以總的來(lái)說(shuō)邏輯圖如下:?

接下來(lái)我們就看下究竟協(xié)程是啥樣子

協(xié)程coroutine

協(xié)程的概念應(yīng)該是從進(jìn)程和線程演變而來(lái)的,他們都是獨(dú)立的執(zhí)行一段代碼,但是不同是線程比進(jìn)程要輕量級(jí),協(xié)程比線程還要輕量級(jí)。多線程在同一個(gè)進(jìn)程中執(zhí)行,而協(xié)程通常也是在一個(gè)線程當(dāng)中執(zhí)行。它們的關(guān)系圖如下:

我們都知道Python由于GIL(Global Interpreter Lock)原因,其線程效率并不高,并且在*nix系統(tǒng)中,創(chuàng)建線程的開(kāi)銷(xiāo)并不比進(jìn)程小,因此在并發(fā)操作時(shí),多線程的效率還是受到了很大制約的。所以后來(lái)人們發(fā)現(xiàn)通過(guò)yield來(lái)中斷代碼片段的執(zhí)行,同時(shí)交出了cpu的使用權(quán),于是協(xié)程的概念產(chǎn)生了。在Python3.4正式引入了協(xié)程的概念,代碼示例如下:

import asyncio# Borrowed from countdown(number, n):

while n 0:

print('T-minus', n, '({})'.format(number)) ? ? ? ?yield from asyncio.sleep(1)

n -= 1loop = asyncio.get_event_loop()

tasks = [

asyncio.ensure_future(countdown("A", 2)),

asyncio.ensure_future(countdown("B", 3))]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()12345678910111213141516

示例顯示了在Python3.4引入兩個(gè)重要概念協(xié)程和事件循環(huán),?

通過(guò)修飾符@asyncio.coroutine定義了一個(gè)協(xié)程,而通過(guò)event loop來(lái)執(zhí)行tasks中所有的協(xié)程任務(wù)。之后在Python3.5引入了新的async await語(yǔ)法,從而有了原生協(xié)程的概念。

async await

在Python3.5中,引入了ayncawait 語(yǔ)法結(jié)構(gòu),通過(guò)”aync def”可以定義一個(gè)協(xié)程代碼片段,作用類(lèi)似于Python3.4中的@asyncio.coroutine修飾符,而await則相當(dāng)于”yield from”。

先來(lái)看一段代碼,這個(gè)是我剛開(kāi)始使用asyncawait語(yǔ)法時(shí),寫(xiě)的一段小程序。

#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time

async def wait_download(url):

response = await requets.get(url)

print("get {} response complete.".format(url))

async def main():

start = time.time()

await asyncio.wait([

wait_download(""),

wait_download(""),

wait_download("")])

end = time.time()

print("Complete in {} seconds".format(end - start))

loop = asyncio.get_event_loop()

loop.run_until_complete(main())12345678910111213141516171819202122232425

這里會(huì)收到這樣的報(bào)錯(cuò):

Task exception was never retrieved

future: Task finished coro=wait_download() done, defined at asynctest.py:9 exception=TypeError("object Response can't be used in 'await' expression",)

Traceback (most recent call last):

File "asynctest.py", line 10, in wait_download

data = await requests.get(url)

TypeError: object Response can't be used in 'await' expression123456

這是由于requests.get()函數(shù)返回的Response對(duì)象不能用于await表達(dá)式,可是如果不能用于await,還怎么樣來(lái)實(shí)現(xiàn)異步呢??

原來(lái)Python的await表達(dá)式是類(lèi)似于”yield from”的東西,但是await會(huì)去做參數(shù)檢查,它要求await表達(dá)式中的對(duì)象必須是awaitable的,那啥是awaitable呢? awaitable對(duì)象必須滿足如下條件中其中之一:

1、A native coroutine object returned from a native coroutine function .

原生協(xié)程對(duì)象

2、A generator-based coroutine object returned from a function decorated with types.coroutine() .

types.coroutine()修飾的基于生成器的協(xié)程對(duì)象,注意不是Python3.4中asyncio.coroutine

3、An object with an await method returning an iterator.

實(shí)現(xiàn)了await method,并在其中返回了iterator的對(duì)象

根據(jù)這些條件定義,我們可以修改代碼如下:

#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time

async def download(url): # 通過(guò)async def定義的函數(shù)是原生的協(xié)程對(duì)象

response = requests.get(url)

print(response.text)

async def wait_download(url):

await download(url) # 這里download(url)就是一個(gè)原生的協(xié)程對(duì)象

print("get {} data complete.".format(url))

async def main():

start = time.time()

await asyncio.wait([

wait_download(""),

wait_download(""),

wait_download("")])

end = time.time()

print("Complete in {} seconds".format(end - start))

loop = asyncio.get_event_loop()

loop.run_until_complete(main())123456789101112131415161718192021222324252627282930

好了現(xiàn)在一個(gè)真正的實(shí)現(xiàn)了異步編程的小程序終于誕生了。?

而目前更牛逼的異步是使用uvloop或者pyuv,這兩個(gè)最新的Python庫(kù)都是libuv實(shí)現(xiàn)的,可以提供更加高效的event loop。

uvloop和pyuv

pyuv實(shí)現(xiàn)了Python2.x和3.x,但是該項(xiàng)目在github上已經(jīng)許久沒(méi)有更新了,不知道是否還有人在維護(hù)。?

uvloop只實(shí)現(xiàn)了3.x, 但是該項(xiàng)目在github上始終活躍。

它們的使用也非常簡(jiǎn)單,以u(píng)vloop為例,只需要添加以下代碼就可以了

import asyncioimport uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())123

Python異步編程全攻略

如果你厭倦了多線程,不妨試試python的異步編程,再引入async, await關(guān)鍵字之后語(yǔ)法變得更加簡(jiǎn)潔和直觀,又經(jīng)過(guò)幾年的生態(tài)發(fā)展,現(xiàn)在是一個(gè)很不錯(cuò)的并發(fā)模型。

下面介紹一下python異步編程的方方面面。

因?yàn)镚IL的存在,所以Python的多線程在CPU密集的任務(wù)下顯得無(wú)力,但是對(duì)于IO密集的任務(wù),多線程還是足以發(fā)揮多線程的優(yōu)勢(shì)的,而異步也是為了應(yīng)對(duì)IO密集的任務(wù),所以兩者是一個(gè)可以相互替代的方案,因?yàn)樵O(shè)計(jì)的不同,理論上異步要比多線程快,因?yàn)楫惒降幕ㄤN(xiāo)更少, 因?yàn)椴恍枰~外系統(tǒng)申請(qǐng)額外的內(nèi)存,而線程的創(chuàng)建跟系統(tǒng)有關(guān),需要分配一定量的內(nèi)存,一般是幾兆,比如linux默認(rèn)是8MB。

雖然異步很好,比如可以使用更少的內(nèi)存,比如更好地控制并發(fā)(也許你并不這么認(rèn)為:))。但是由于async/await 語(yǔ)法的存在導(dǎo)致與之前的語(yǔ)法有些割裂,所以需要適配,需要付出額外的努力,再者就是生態(tài)遠(yuǎn)遠(yuǎn)沒(méi)有同步編程強(qiáng)大,比如很多庫(kù)還不支持異步,所以你需要一些額外的適配。

為了不給其他網(wǎng)站帶來(lái)困擾,這里首先在自己電腦啟動(dòng)web服務(wù)用于測(cè)試,代碼很簡(jiǎn)單。

本文所有依賴(lài)如下:

所有依賴(lài)可通過(guò)代碼倉(cāng)庫(kù)的requirements.txt一次性安裝。

首先看一個(gè)錯(cuò)誤的例子

輸出如下:

發(fā)現(xiàn)花費(fèi)了3秒,不符合預(yù)期呀。。。。這是因?yàn)殡m然用了協(xié)程,但是每個(gè)協(xié)程是串行的運(yùn)行,也就是說(shuō)后一個(gè)等前一個(gè)完成之后才開(kāi)始,那么這樣的異步代碼并沒(méi)有并發(fā),所以我們需要讓這些協(xié)程并行起來(lái)

為了讓代碼變動(dòng)的不是太多,所以這里用了一個(gè)笨辦法來(lái)等待所有任務(wù)完成, 之所以在main函數(shù)中等待是為了不讓ClientSession關(guān)閉, 如果你移除了main函數(shù)中的等待代碼會(huì)發(fā)現(xiàn)報(bào)告異常 RuntimeError: Session is closed ,而代碼里的解決方案非常的不優(yōu)雅,需要手動(dòng)的等待,為了解決這個(gè)問(wèn)題,我們?cè)俅胃倪M(jìn)代碼。

這里解決的方式是通過(guò) asyncio.wait 方法等待一個(gè)協(xié)程列表,默認(rèn)是等待所有協(xié)程結(jié)束后返回,會(huì)返回一個(gè)完成(done)列表,以及一個(gè)待辦(pending)列表。

如果我們不想要協(xié)程對(duì)象而是結(jié)果,那么我們可以使用 asyncio.gather

結(jié)果輸出如下:

通過(guò) asyncio.ensure_future 我們就能創(chuàng)建一個(gè)協(xié)程,跟調(diào)用一個(gè)函數(shù)差別不大,為了等待所有任務(wù)完成之后退出,我們需要使用 asyncio.wait 等方法來(lái)等待,如果只想要協(xié)程輸出的結(jié)果,我們可以使用 asyncio.gather 來(lái)獲取結(jié)果。

雖然前面能夠隨心所欲的創(chuàng)建協(xié)程,但是就像多線程一樣,我們也需要處理協(xié)程之間的同步問(wèn)題,為了保持語(yǔ)法及使用情況的一致,多線程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用異步的關(guān)鍵字,比如 async with/ await 等

通過(guò)鎖讓并發(fā)慢下來(lái),讓協(xié)程一個(gè)一個(gè)的運(yùn)行。

輸出如下:

通過(guò)觀察很容易發(fā)現(xiàn),并發(fā)的速度因?yàn)殒i而慢下來(lái)了,因?yàn)槊看沃挥幸粋€(gè)協(xié)程能獲得鎖,所以并發(fā)變成了串行。

通過(guò)事件來(lái)通知特定的協(xié)程開(kāi)始工作,假設(shè)有一個(gè)任務(wù)是根據(jù)http響應(yīng)結(jié)果選擇是否激活。

輸出如下:

可以看到事件(Event)等待者都是在得到響應(yīng)內(nèi)容之后輸出,并且事件(Event)可以是多個(gè)協(xié)程同時(shí)等待。

上面的事件雖然很棒,能夠在不同的協(xié)程之間同步狀態(tài),并且也能夠一次性同步所有的等待協(xié)程,但是還不夠精細(xì)化,比如想通知指定數(shù)量的等待協(xié)程,這個(gè)時(shí)候Event就無(wú)能為力了,所以同步原語(yǔ)中出現(xiàn)了Condition。

輸出如下:

可以看到,前面兩個(gè)等待的協(xié)程是在同一時(shí)刻完成,而不是全部等待完成。

通過(guò)創(chuàng)建協(xié)程的數(shù)量來(lái)控制并發(fā)并不是非常優(yōu)雅的方式,所以可以通過(guò)信號(hào)量的方式來(lái)控制并發(fā)。

輸出如下:

可以發(fā)現(xiàn),雖然同時(shí)創(chuàng)建了三個(gè)協(xié)程,但是同一時(shí)刻只有兩個(gè)協(xié)程工作,而另外一個(gè)協(xié)程需要等待一個(gè)協(xié)程讓出信號(hào)量才能運(yùn)行。

無(wú)論是協(xié)程還是線程,任務(wù)之間的狀態(tài)同步還是很重要的,所以有了應(yīng)對(duì)各種同步機(jī)制的同步原語(yǔ),因?yàn)橐WC一個(gè)資源同一個(gè)時(shí)刻只能一個(gè)任務(wù)訪問(wèn),所以引入了鎖,又因?yàn)樾枰粋€(gè)任務(wù)等待另一個(gè)任務(wù),或者多個(gè)任務(wù)等待某個(gè)任務(wù),因此引入了事件(Event),但是為了更精細(xì)的控制通知的程度,所以又引入了條件(Condition), 通過(guò)條件可以控制一次通知多少的任務(wù)。

有時(shí)候的并發(fā)需求是通過(guò)一個(gè)變量控制并發(fā)任務(wù)的并發(fā)數(shù)而不是通過(guò)創(chuàng)建協(xié)程的數(shù)量來(lái)控制并發(fā),所以引入了信號(hào)量(Semaphore),這樣就可以在創(chuàng)建的協(xié)程數(shù)遠(yuǎn)遠(yuǎn)大于并發(fā)數(shù)的情況下讓協(xié)程在指定的并發(fā)量情況下并發(fā)。

不得不承認(rèn)異步編程相比起同步編程的生態(tài)要小的很多,所以不可能完全異步編程,因此需要一種方式兼容。

多線程是為了兼容同步得代碼。

多進(jìn)程是為了利用CPU多核的能力。

輸出如下:

可以看到總耗時(shí)1秒,說(shuō)明所有的線程跟進(jìn)程是同時(shí)運(yùn)行的。

下面是本人使用過(guò)的一些異步庫(kù),僅供參考

web框架

http客戶端

數(shù)據(jù)庫(kù)

ORM

雖然異步庫(kù)發(fā)展得還算不錯(cuò),但是中肯的說(shuō)并沒(méi)有覆蓋方方面面。

雖然我鼓勵(lì)大家嘗試異步編程,但是本文的最后卻是讓大家謹(jǐn)慎的選擇開(kāi)發(fā)環(huán)境,如果你覺(jué)得本文的并發(fā),同步,兼容多線程,多進(jìn)程不值得一提,那么我十分推薦你嘗試以異步編程的方式開(kāi)始一個(gè)新的項(xiàng)目,如果你對(duì)其中一些還有疑問(wèn)或者你確定了要使用的依賴(lài)庫(kù)并且大多數(shù)是沒(méi)有異步庫(kù)替代的,那么我還是建議你直接按照自己擅長(zhǎng)的同步編程開(kāi)始。

異步編程雖然很不錯(cuò),不過(guò),也許你并不需要。

Python 異步任務(wù)隊(duì)列Celery 使用

在 Python 中定義 Celery 的時(shí)候,我們要引入 Broker,中文翻譯過(guò)來(lái)就是“中間人”的意思。在工頭(生產(chǎn)者)提出任務(wù)的時(shí)候,把所有的任務(wù)放到 Broker 里面,在 Broker 的另外一頭,一群碼農(nóng)(消費(fèi)者)等著取出一個(gè)個(gè)任務(wù)準(zhǔn)備著手做。這種模式注定了整個(gè)系統(tǒng)會(huì)是個(gè)開(kāi)環(huán)系統(tǒng),工頭對(duì)于碼農(nóng)們把任務(wù)做的怎樣是不知情的。所以我們要引入 Backend 來(lái)保存每次任務(wù)的結(jié)果。這個(gè) Backend 也是存儲(chǔ)任務(wù)的信息用的,只不過(guò)這里存的是那些任務(wù)的返回結(jié)果。我們可以選擇只讓錯(cuò)誤執(zhí)行的任務(wù)返回結(jié)果到 Backend,這樣我們?nèi)』亟Y(jié)果,便可以知道有多少任務(wù)執(zhí)行失敗了。

其實(shí)現(xiàn)架構(gòu)如下圖所示:

可以看到,Celery 主要包含以下幾個(gè)模塊:

celery可以通過(guò)pip自動(dòng)安裝。

broker 可選擇使用RabbitMQ/redis,backend可選擇使用RabbitMQ/redis/MongoDB。RabbitMQ/redis/mongoDB的安裝請(qǐng)參考對(duì)應(yīng)的官方文檔。

------------------------------rabbitmq相關(guān)----------------------------------------------------------

官網(wǎng)安裝方法:

啟動(dòng)管理插件:sbin/rabbitmq-plugins enable rabbitmq_management 啟動(dòng)rabbitmq:sbin/rabbitmq-server -detached

rabbitmq已經(jīng)啟動(dòng),可以打開(kāi)頁(yè)面來(lái)看看 地址:

用戶名密碼都是guest 。進(jìn)入可以看到具體頁(yè)面。 關(guān)于rabbitmq的配置,網(wǎng)上很多 自己去搜以下就ok了。

------------------------------rabbitmq相關(guān)--------------------------------------------------------

項(xiàng)目結(jié)構(gòu)如下:

使用前,需要三個(gè)方面:celery配置,celery實(shí)例,需執(zhí)行的任務(wù)函數(shù),如下:

Celery 的配置比較多,可以在 官方配置文檔: 查詢每個(gè)配置項(xiàng)的含義。

當(dāng)然,要保證上述異步任務(wù)and下述定時(shí)任務(wù)都能正常執(zhí)行,就需要先啟動(dòng)celery worker,啟動(dòng)命令行如下:

需 啟動(dòng)beat ,執(zhí)行定時(shí)任務(wù)時(shí), Celery會(huì)通過(guò)celery beat進(jìn)程來(lái)完成。Celery beat會(huì)保持運(yùn)行, 一旦到了某一定時(shí)任務(wù)需要執(zhí)行時(shí), Celery beat便將其加入到queue中. 不像worker進(jìn)程, Celery beat只需要一個(gè)即可。而且為了避免有重復(fù)的任務(wù)被發(fā)送出去,所以Celery beat僅能有一個(gè)。

命令行啟動(dòng):

如果你想將celery worker/beat要放到后臺(tái)運(yùn)行,推薦可以扔給supervisor。

supervisor.conf如下:

python2.7怎么實(shí)現(xiàn)異步

改進(jìn)之前

之前,我的查詢步驟很簡(jiǎn)單,就是:

前端提交查詢請(qǐng)求 -- 建立數(shù)據(jù)庫(kù)連接 -- 新建游標(biāo) -- 執(zhí)行命令 -- 接受結(jié)果 -- 關(guān)閉游標(biāo)、連接

這幾大步驟的順序執(zhí)行。

這里面當(dāng)然問(wèn)題很大:

建立數(shù)據(jù)庫(kù)連接實(shí)際上就是新建一個(gè)套接字。這是進(jìn)程間通信的幾種方法里,開(kāi)銷(xiāo)最大的了。

在“執(zhí)行命令”和“接受結(jié)果”兩個(gè)步驟中,線程在阻塞在數(shù)據(jù)庫(kù)內(nèi)部的運(yùn)行過(guò)程中,數(shù)據(jù)庫(kù)連接和游標(biāo)都處于閑置狀態(tài)。

這樣一來(lái),每一次查詢都要順序的新建數(shù)據(jù)庫(kù)連接,都要阻塞在數(shù)據(jù)庫(kù)返回結(jié)果的過(guò)程中。當(dāng)前端提交大量查詢請(qǐng)求時(shí),查詢效率肯定是很低的。

第一次改進(jìn)

之前的模塊里,問(wèn)題最大的就是第一步——建立數(shù)據(jù)庫(kù)連接套接字了。如果能夠一次性建立連接,之后查詢能夠反復(fù)服用這個(gè)連接就好了。

所以,首先應(yīng)該把數(shù)據(jù)庫(kù)查詢模塊作為一個(gè)單獨(dú)的守護(hù)進(jìn)程去執(zhí)行,而前端app作為主進(jìn)程響應(yīng)用戶的點(diǎn)擊操作。那么兩條進(jìn)程怎么傳遞消息呢?翻了幾天Python文檔,終于構(gòu)思出來(lái):用隊(duì)列queue作為生產(chǎn)者(web前端)向消費(fèi)者(數(shù)據(jù)庫(kù)后端)傳遞任務(wù)的渠道。生產(chǎn)者,會(huì)與SQL命令一起,同時(shí)傳遞一個(gè)管道pipe的連接對(duì)象,作為任務(wù)完成后,回傳結(jié)果的渠道。確保,任務(wù)的接收方與發(fā)送方保持一致。

作為第二個(gè)問(wèn)題的解決方法,可以使用線程池來(lái)并發(fā)獲取任務(wù)隊(duì)列中的task,然后執(zhí)行命令并回傳結(jié)果。

第二次改進(jìn)

第一次改進(jìn)的效果還是很明顯的,不用任何測(cè)試手段。直接點(diǎn)擊頁(yè)面鏈接,可以很直觀地感覺(jué)到反應(yīng)速度有很明顯的加快。

但是對(duì)于第二個(gè)問(wèn)題,使用線程池還是有些欠妥當(dāng)。因?yàn)椋珻Python解釋器存在GIL問(wèn)題,所有線程實(shí)際上都在一個(gè)解釋器進(jìn)程里調(diào)度。線程稍微開(kāi)多一點(diǎn),解釋器進(jìn)程就會(huì)頻繁的切換線程,而線程切換的開(kāi)銷(xiāo)也不小。線程多一點(diǎn),甚至?xí)霈F(xiàn)“抖動(dòng)”問(wèn)題(也就是剛剛喚醒一個(gè)線程,就進(jìn)入掛起狀態(tài),剛剛換到棧幀或內(nèi)存的上下文,又被換回內(nèi)存或者磁盤(pán)),效率大大降低。也就是說(shuō),線程池的并發(fā)量很有限。

試過(guò)了多進(jìn)程、多線程,只能在單個(gè)線程里做文章了。

Python中的asyncio庫(kù)

Python里有大量的協(xié)程庫(kù)可以實(shí)現(xiàn)單線程內(nèi)的并發(fā)操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫(kù)同樣可以實(shí)現(xiàn)協(xié)程并發(fā)。asyncio庫(kù)大大降低了Python中協(xié)程的實(shí)現(xiàn)難度,就像定義普通函數(shù)那樣就可以了,只是要在def前面多加一個(gè)async關(guān)鍵詞。async def函數(shù)中,需要阻塞在其他async def函數(shù)的位置前面可以加上await關(guān)鍵詞。

import asyncio

async def wait():

await asyncio.sleep(2)

async def execute(task):

process_task(task)

await wait()

continue_job()

async def函數(shù)的執(zhí)行稍微麻煩點(diǎn)。需要首先獲取一個(gè)loop對(duì)象,然后由這個(gè)對(duì)象代為執(zhí)行async def函數(shù)。

loop = asyncio.get_event_loop()

loop.run_until_complete(execute(task))

loop.close()

loop在執(zhí)行execute(task)函數(shù)時(shí),如果遇到await關(guān)鍵字,就會(huì)暫時(shí)掛起當(dāng)前協(xié)程,轉(zhuǎn)而去執(zhí)行其他阻塞在await關(guān)鍵詞的協(xié)程,從而實(shí)現(xiàn)協(xié)程并發(fā)。

不過(guò)需要注意的是,run_until_complete()函數(shù)本身是一個(gè)阻塞函數(shù)。也就是說(shuō),當(dāng)前線程會(huì)等候一個(gè)run_until_complete()函數(shù)執(zhí)行完畢之后,才會(huì)繼續(xù)執(zhí)行下一部函數(shù)。所以下面這段代碼并不能并發(fā)執(zhí)行。

for task in task_list:

loop.run_until_complete(task)

對(duì)與這個(gè)問(wèn)題,asyncio庫(kù)也有相應(yīng)的解決方案:gather函數(shù)。

loop = asyncio.get_event_loop()

tasks = [asyncio.ensure_future(execute(task))

for task in task_list]

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

當(dāng)然了,async def函數(shù)的執(zhí)行并不只有這兩種解決方案,還有call_soon與run_forever的配合執(zhí)行等等,更多內(nèi)容還請(qǐng)參考官方文檔。

Python下的I/O多路復(fù)用

協(xié)程,實(shí)際上,也存在上下文切換,只不過(guò)開(kāi)銷(xiāo)很輕微。而I/O多路復(fù)用則完全不存在這個(gè)問(wèn)題。

目前,Linux上比較火的I/O多路復(fù)用API要算epoll了。Tornado,就是通過(guò)調(diào)用C語(yǔ)言封裝的epoll庫(kù),成功解決了C10K問(wèn)題(當(dāng)然還有Pypy的功勞)。

在Linux里查文檔,可以看到epoll只有三類(lèi)函數(shù),調(diào)用起來(lái)比較方便易懂。

創(chuàng)建epoll對(duì)象,并返回其對(duì)應(yīng)的文件描述符(file descriptor)。

int epoll_create(int size);

int epoll_create1(int flags);

控制監(jiān)聽(tīng)事件。第一個(gè)參數(shù)epfd就對(duì)應(yīng)于前面命令創(chuàng)建的epoll對(duì)象的文件描述符;第二個(gè)參數(shù)表示該命令要執(zhí)行的動(dòng)作:監(jiān)聽(tīng)事件的新增、修改或者刪除;第三個(gè)參數(shù),是要監(jiān)聽(tīng)的文件對(duì)應(yīng)的描述符;第四個(gè),代表要監(jiān)聽(tīng)的事件。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

等候。這是一個(gè)阻塞函數(shù),調(diào)用者會(huì)等候內(nèi)核通知所注冊(cè)的事件被觸發(fā)。

int epoll_wait(int epfd, struct epoll_event *events,

int maxevents, int timeout);

int epoll_pwait(int epfd, struct epoll_event *events,

int maxevents, int timeout,

const sigset_t *sigmask);

在Python的select庫(kù)里:

select.epoll()對(duì)應(yīng)于第一類(lèi)創(chuàng)建函數(shù);

epoll.register(),epoll.unregister(),epoll.modify()均是對(duì)控制函數(shù)epoll_ctl的封裝;

epoll.poll()則是對(duì)等候函數(shù)epoll_wait的封裝。

Python里epoll相關(guān)API的最大問(wèn)題應(yīng)該是在epoll.poll()。相比于其所封裝的epoll_wait,用戶無(wú)法手動(dòng)指定要等候的事件,也就是后者的第二個(gè)參數(shù)struct epoll_event *events。沒(méi)法實(shí)現(xiàn)精確控制。因此只能使用替代方案:select.select()函數(shù)。

根據(jù)Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對(duì)Unix系統(tǒng)中select函數(shù)的直接調(diào)用,與C語(yǔ)言API的傳參很接近。前三個(gè)參數(shù)都是列表,其中的元素都是要注冊(cè)到內(nèi)核的文件描述符。如果想用自定義類(lèi),就要確保實(shí)現(xiàn)了fileno()方法。

其分別對(duì)應(yīng)于:

rlist: 等候直到可讀

wlist: 等候直到可寫(xiě)

xlist: 等候直到異常。這個(gè)異常的定義,要查看系統(tǒng)文檔。

select.select(),類(lèi)似于epoll.poll(),先注冊(cè)文件和事件,然后保持等候內(nèi)核通知,是阻塞函數(shù)。

實(shí)際應(yīng)用

Psycopg2庫(kù)支持對(duì)異步和協(xié)程,但和一般情況下的用法略有區(qū)別。普通數(shù)據(jù)庫(kù)連接支持不同線程中的不同游標(biāo)并發(fā)查詢;而異步連接則不支持不同游標(biāo)的同時(shí)查詢。所以異步連接的不同游標(biāo)之間必須使用I/O復(fù)用方法來(lái)協(xié)調(diào)調(diào)度。

所以,我的大致實(shí)現(xiàn)思路是這樣的:首先并發(fā)執(zhí)行大量協(xié)程,從任務(wù)隊(duì)列中提取任務(wù),再向連接池請(qǐng)求連接,創(chuàng)建游標(biāo),然后執(zhí)行命令,并返回結(jié)果。在獲取游標(biāo)和接受查詢結(jié)果之前,均要阻塞等候內(nèi)核通知連接可用。

其中,連接池返回連接時(shí),會(huì)根據(jù)引用連接的協(xié)程數(shù)量,返回負(fù)載最輕的連接。這也是自己定義AsyncConnectionPool類(lèi)的目的。

我的代碼位于:bottle-blog/dbservice.py

存在問(wèn)題

當(dāng)然了,這個(gè)流程目前還一些問(wèn)題。

首先就是每次輪詢拿到任務(wù)之后,都會(huì)走這么一個(gè)流程。

獲取連接 -- 新建游標(biāo) -- 執(zhí)行任務(wù) -- 關(guān)閉游標(biāo) -- 取消連接引用

本來(lái),最好的情況應(yīng)該是:在輪詢之前,就建好游標(biāo);在輪詢時(shí),直接等候內(nèi)核通知,執(zhí)行相應(yīng)任務(wù)。這樣可以減少輪詢時(shí)的任務(wù)量。但是如果協(xié)程提前對(duì)應(yīng)好連接,那就不能保證在獲取任務(wù)時(shí),保持各連接負(fù)載均衡了。

所以這一塊,還有工作要做。

還有就是epoll沒(méi)能用上,有些遺憾。

以后打算寫(xiě)點(diǎn)C語(yǔ)言的內(nèi)容,或者用Python/C API,或者用Ctypes包裝共享庫(kù),來(lái)實(shí)現(xiàn)epoll的調(diào)用。

最后,請(qǐng)?jiān)试S我吐槽一下Python的epoll相關(guān)文檔:簡(jiǎn)直太弱了!!!必須看源碼才能弄清楚功能。

網(wǎng)頁(yè)標(biāo)題:python異步函數(shù) python調(diào)用異步函數(shù)
當(dāng)前鏈接:http://m.kartarina.com/article30/hjocso.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供全網(wǎng)營(yíng)銷(xiāo)推廣關(guān)鍵詞優(yōu)化搜索引擎優(yōu)化App開(kāi)發(fā)服務(wù)器托管網(wǎng)站內(nèi)鏈

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

手機(jī)網(wǎng)站建設(shè)
主站蜘蛛池模板: 在线观看成人无码中文av天堂| 国产网红主播无码精品| 丰满少妇被猛烈进入无码| 亚洲AV无码一区二区乱孑伦AS| 无码人妻精品一区二区三区夜夜嗨| 亚洲国产成人片在线观看无码| 无码AV岛国片在线播放| 精品无码一区二区三区爱欲九九 | 国产精品无码亚洲精品2021| 无码国产精品一区二区免费| 精品久久久久久中文字幕无码| 亚洲AV无码专区日韩| 亚洲AV无码1区2区久久| 精品久久久久久中文字幕无码| 国产亚洲3p无码一区二区| 男男AV纯肉无码免费播放无码| 亚洲AV中文无码乱人伦在线视色 | 成人午夜亚洲精品无码网站| 亚洲av日韩av无码黑人| 成人免费无码视频在线网站| 国产爆乳无码视频在线观看 | 无码人妻一区二区三区兔费| 成人无码WWW免费视频| 久久亚洲精品无码av| 国产亚洲AV无码AV男人的天堂| 精品人妻无码一区二区三区蜜桃一| 亚洲熟妇无码另类久久久| 亚洲AV日韩AV无码污污网站 | 亚洲国产精品无码久久久不卡| 无码免费午夜福利片在线 | 无码不卡av东京热毛片| 国精品无码一区二区三区在线| 激情射精爆插热吻无码视频| 免费无码一区二区三区| 中文字幕av无码一区二区三区电影| 国产免费av片在线无码免费看| 人妻精品无码一区二区三区| 综合无码一区二区三区四区五区| 丰满熟妇人妻Av无码区| 亚洲av无码一区二区三区乱子伦| 国产在线观看无码免费视频|