国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術(shù)文章
文章詳情頁

Python 使用生成器代替線程的方法

瀏覽:60日期:2022-07-15 09:58:30

問題

你想使用生成器(協(xié)程)替代系統(tǒng)線程來實現(xiàn)并發(fā)。這個有時又被稱為用戶級線程或綠色線程。

解決方案

要使用生成器實現(xiàn)自己的并發(fā),你首先要對生成器函數(shù)和 yield 語句有深刻理解。 yield 語句會讓一個生成器掛起它的執(zhí)行,這樣就可以編寫一個調(diào)度器, 將生成器當做某種“任務(wù)”并使用任務(wù)協(xié)作切換來替換它們的執(zhí)行。 要演示這種思想,考慮下面兩個使用簡單的 yield 語句的生成器函數(shù):

# Two simple generator functionsdef countdown(n): while n > 0: print(’T-minus’, n) yield n -= 1 print(’Blastoff!’)def countup(n): x = 0 while x < n: print(’Counting up’, x) yield x += 1

這些函數(shù)在內(nèi)部使用yield語句,下面是一個實現(xiàn)了簡單任務(wù)調(diào)度器的代碼:

from collections import dequeclass TaskScheduler: def __init__(self): self._task_queue = deque() def new_task(self, task): ’’’ Admit a newly started task to the scheduler ’’’ self._task_queue.append(task) def run(self): ’’’ Run until there are no more tasks ’’’ while self._task_queue: task = self._task_queue.popleft() try:# Run until the next yield statementnext(task)self._task_queue.append(task) except StopIteration:# Generator is no longer executingpass# Example usesched = TaskScheduler()sched.new_task(countdown(10))sched.new_task(countdown(5))sched.new_task(countup(15))sched.run()

TaskScheduler 類在一個循環(huán)中運行生成器集合——每個都運行到碰到y(tǒng)ield語句為止。 運行這個例子,輸出如下:

T-minus 10T-minus 5Counting up 0T-minus 9T-minus 4Counting up 1T-minus 8T-minus 3Counting up 2T-minus 7T-minus 2...

到此為止,我們實際上已經(jīng)實現(xiàn)了一個“操作系統(tǒng)”的最小核心部分。 生成器函數(shù)就是任務(wù),而yield語句是任務(wù)掛起的信號。 調(diào)度器循環(huán)檢查任務(wù)列表直到?jīng)]有任務(wù)要執(zhí)行為止。

實際上,你可能想要使用生成器來實現(xiàn)簡單的并發(fā)。 那么,在實現(xiàn)actor或網(wǎng)絡(luò)服務(wù)器的時候你可以使用生成器來替代線程的使用。

下面的代碼演示了使用生成器來實現(xiàn)一個不依賴線程的actor:

from collections import dequeclass ActorScheduler: def __init__(self): self._actors = {} # Mapping of names to actors self._msg_queue = deque() # Message queue def new_actor(self, name, actor): ’’’ Admit a newly started actor to the scheduler and give it a name ’’’ self._msg_queue.append((actor,None)) self._actors[name] = actor def send(self, name, msg): ’’’ Send a message to a named actor ’’’ actor = self._actors.get(name) if actor: self._msg_queue.append((actor,msg)) def run(self): ’’’ Run as long as there are pending messages. ’’’ while self._msg_queue: actor, msg = self._msg_queue.popleft() try: actor.send(msg) except StopIteration: pass# Example useif __name__ == ’__main__’: def printer(): while True: msg = yield print(’Got:’, msg) def counter(sched): while True: # Receive the current count n = yield if n == 0:break # Send to the printer task sched.send(’printer’, n) # Send the next count to the counter task (recursive) sched.send(’counter’, n-1) sched = ActorScheduler() # Create the initial actors sched.new_actor(’printer’, printer()) sched.new_actor(’counter’, counter(sched)) # Send an initial message to the counter to initiate sched.send(’counter’, 10000) sched.run()

完全弄懂這段代碼需要更深入的學(xué)習(xí),但是關(guān)鍵點在于收集消息的隊列。 本質(zhì)上,調(diào)度器在有需要發(fā)送的消息時會一直運行著。 計數(shù)生成器會給自己發(fā)送消息并在一個遞歸循環(huán)中結(jié)束。

下面是一個更加高級的例子,演示了使用生成器來實現(xiàn)一個并發(fā)網(wǎng)絡(luò)應(yīng)用程序:

from collections import dequefrom select import select# This class represents a generic yield event in the schedulerclass YieldEvent: def handle_yield(self, sched, task): pass def handle_resume(self, sched, task): pass# Task Schedulerclass Scheduler: def __init__(self): self._numtasks = 0 # Total num of tasks self._ready = deque() # Tasks ready to run self._read_waiting = {} # Tasks waiting to read self._write_waiting = {} # Tasks waiting to write # Poll for I/O events and restart waiting tasks def _iopoll(self): rset,wset,eset = select(self._read_waiting,self._write_waiting,[]) for r in rset: evt, task = self._read_waiting.pop(r) evt.handle_resume(self, task) for w in wset: evt, task = self._write_waiting.pop(w) evt.handle_resume(self, task) def new(self,task): ’’’ Add a newly started task to the scheduler ’’’ self._ready.append((task, None)) self._numtasks += 1 def add_ready(self, task, msg=None): ’’’ Append an already started task to the ready queue. msg is what to send into the task when it resumes. ’’’ self._ready.append((task, msg)) # Add a task to the reading set def _read_wait(self, fileno, evt, task): self._read_waiting[fileno] = (evt, task) # Add a task to the write set def _write_wait(self, fileno, evt, task): self._write_waiting[fileno] = (evt, task) def run(self): ’’’ Run the task scheduler until there are no tasks ’’’ while self._numtasks: if not self._ready: self._iopoll() task, msg = self._ready.popleft() try: # Run the coroutine to the next yield r = task.send(msg) if isinstance(r, YieldEvent): r.handle_yield(self, task) else: raise RuntimeError(’unrecognized yield event’) except StopIteration: self._numtasks -= 1# Example implementation of coroutine-based socket I/Oclass ReadSocket(YieldEvent): def __init__(self, sock, nbytes): self.sock = sock self.nbytes = nbytes def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): data = self.sock.recv(self.nbytes) sched.add_ready(task, data)class WriteSocket(YieldEvent): def __init__(self, sock, data): self.sock = sock self.data = data def handle_yield(self, sched, task): sched._write_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): nsent = self.sock.send(self.data) sched.add_ready(task, nsent)class AcceptSocket(YieldEvent): def __init__(self, sock): self.sock = sock def handle_yield(self, sched, task): sched._read_wait(self.sock.fileno(), self, task) def handle_resume(self, sched, task): r = self.sock.accept() sched.add_ready(task, r)# Wrapper around a socket object for use with yieldclass Socket(object): def __init__(self, sock): self._sock = sock def recv(self, maxbytes): return ReadSocket(self._sock, maxbytes) def send(self, data): return WriteSocket(self._sock, data) def accept(self): return AcceptSocket(self._sock) def __getattr__(self, name): return getattr(self._sock, name)if __name__ == ’__main__’: from socket import socket, AF_INET, SOCK_STREAM import time # Example of a function involving generators. This should # be called using line = yield from readline(sock) def readline(sock): chars = [] while True: c = yield sock.recv(1) if not c:break chars.append(c) if c == b’n’:break return b’’.join(chars) # Echo server using generators class EchoServer: def __init__(self,addr,sched): self.sched = sched sched.new(self.server_loop(addr)) def server_loop(self,addr): s = Socket(socket(AF_INET,SOCK_STREAM)) s.bind(addr) s.listen(5) while True:c,a = yield s.accept()print(’Got connection from ’, a)self.sched.new(self.client_handler(Socket(c))) def client_handler(self,client): while True:line = yield from readline(client)if not line: breakline = b’GOT:’ + linewhile line: nsent = yield client.send(line) line = line[nsent:] client.close() print(’Client closed’) sched = Scheduler() EchoServer((’’,16000),sched) sched.run()

這段代碼有點復(fù)雜。不過,它實現(xiàn)了一個小型的操作系統(tǒng)。 有一個就緒的任務(wù)隊列,并且還有因I/O休眠的任務(wù)等待區(qū)域。 還有很多調(diào)度器負責(zé)在就緒隊列和I/O等待區(qū)域之間移動任務(wù)。

討論

在構(gòu)建基于生成器的并發(fā)框架時,通常會使用更常見的yield形式:

def some_generator(): ... result = yield data ...

使用這種形式的yield語句的函數(shù)通常被稱為“協(xié)程”。 通過調(diào)度器,yield語句在一個循環(huán)中被處理,如下:

f = some_generator()# Initial result. Is None to start since nothing has been computedresult = Nonewhile True: try: data = f.send(result) result = ... do some calculation ... except StopIteration: break

這里的邏輯稍微有點復(fù)雜。不過,被傳給 send() 的值定義了在yield語句醒來時的返回值。 因此,如果一個yield準備在對之前yield數(shù)據(jù)的回應(yīng)中返回結(jié)果時,會在下一次 send() 操作返回。 如果一個生成器函數(shù)剛開始運行,發(fā)送一個None值會讓它排在第一個yield語句前面。

除了發(fā)送值外,還可以在一個生成器上面執(zhí)行一個 close() 方法。 它會導(dǎo)致在執(zhí)行yield語句時拋出一個 GeneratorExit 異常,從而終止執(zhí)行。 如果進一步設(shè)計,一個生成器可以捕獲這個異常并執(zhí)行清理操作。 同樣還可以使用生成器的 throw() 方法在yield語句執(zhí)行時生成一個任意的執(zhí)行指令。 一個任務(wù)調(diào)度器可利用它來在運行的生成器中處理錯誤。

最后一個例子中使用的 yield from 語句被用來實現(xiàn)協(xié)程,可以被其它生成器作為子程序或過程來調(diào)用。 本質(zhì)上就是將控制權(quán)透明的傳輸給新的函數(shù)。 不像普通的生成器,一個使用 yield from 被調(diào)用的函數(shù)可以返回一個作為 yield from 語句結(jié)果的值。 關(guān)于 yield from 的更多信息可以在 PEP 380 中找到。

最后,如果使用生成器編程,要提醒你的是它還是有很多缺點的。 特別是,你得不到任何線程可以提供的好處。例如,如果你執(zhí)行CPU依賴或I/O阻塞程序, 它會將整個任務(wù)掛起直到操作完成。為了解決這個問題, 你只能選擇將操作委派給另外一個可以獨立運行的線程或進程。 另外一個限制是大部分Python庫并不能很好的兼容基于生成器的線程。 如果你選擇這個方案,你會發(fā)現(xiàn)你需要自己改寫很多標準庫函數(shù)。 作為本節(jié)提到的協(xié)程和相關(guān)技術(shù)的一個基礎(chǔ)背景,可以查看 PEP 342 和 “協(xié)程和并發(fā)的一門有趣課程”

PEP 3156 同樣有一個關(guān)于使用協(xié)程的異步I/O模型。 特別的,你不可能自己去實現(xiàn)一個底層的協(xié)程調(diào)度器。 不過,關(guān)于協(xié)程的思想是很多流行庫的基礎(chǔ), 包括 gevent, greenlet, Stackless Python 以及其他類似工程。

以上就是Python 使用生成器代替線程的方法的詳細內(nèi)容,更多關(guān)于Python 生成器代替線程的資料請關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 久久精品国产亚洲a | 成人做爰视频www视频 | 新婚第一次一级毛片 | 亚洲性欧美 | 最新亚洲精品 | 欧美特级 | 一品道一本香蕉视频 | 欧美成人网7777视频 | 精品亚洲综合久久中文字幕 | 国产日比视频 | 亚洲永久中文字幕在线 | 成年人视频在线免费播放 | 国外免费一级 | 欧美成人全部免费观看1314色 | 日韩精品一区二区三区免费视频 | 亚洲天堂视频在线免费观看 | 在线观看国产精品日本不卡网 | 欧美视频在线观看网站 | 三级色网站 | 国产 magnet | 日本妞xxxxxxxxx69 | 欧美另类孕交免费观看 | 久久久久99精品成人片三人毛片 | 特级毛片www欧美 | 成年人在线观看网站 | 日本aaaa片毛片免费 | 亚洲最大免费视频网 | 成人午夜久久精品 | 9cao视频精品 | 伊在人亚洲香蕉精品区 | 久久国产国内精品对话对白 | 欧美亚洲中日韩中文字幕在线 | 99视频在线播放 | 亚州人成网在线播放 | 久久精品一区二区三区不卡牛牛 | 国产精亚洲视频 | 国产精品成人久久久久 | 成人牲交一极毛片 | 99久久精品免费看国产免费 | 亚洲午夜精品在线 | 中文一级国产特级毛片视频 |