国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術(shù)文章
文章詳情頁

python線程池 ThreadPoolExecutor 的用法示例

瀏覽:2日期:2022-07-08 16:14:31

前言

從Python3.2開始,標(biāo)準(zhǔn)庫為我們提供了 concurrent.futures 模塊,它提供了 ThreadPoolExecutor (線程池)和ProcessPoolExecutor (進(jìn)程池)兩個(gè)類。

相比 threading 等模塊,該模塊通過 submit 返回的是一個(gè) future 對(duì)象,它是一個(gè)未來可期的對(duì)象,通過它可以獲悉線程的狀態(tài)主線程(或進(jìn)程)中可以獲取某一個(gè)線程(進(jìn)程)執(zhí)行的狀態(tài)或者某一個(gè)任務(wù)執(zhí)行的狀態(tài)及返回值:

主線程可以獲取某一個(gè)線程(或者任務(wù)的)的狀態(tài),以及返回值。當(dāng)一個(gè)線程完成的時(shí)候,主線程能夠立即知道。讓多線程和多進(jìn)程的編碼接口一致。

線程池的基本使用

# coding: utf-8from concurrent.futures import ThreadPoolExecutorimport timedef spider(page): time.sleep(page) print(f'crawl task{page} finished') return pagewith ThreadPoolExecutor(max_workers=5) as t: # 創(chuàng)建一個(gè)最大容納數(shù)量為5的線程池 task1 = t.submit(spider, 1) task2 = t.submit(spider, 2) # 通過submit提交執(zhí)行的函數(shù)到線程池中 task3 = t.submit(spider, 3) print(f'task1: {task1.done()}') # 通過done來判斷線程是否完成 print(f'task2: {task2.done()}') print(f'task3: {task3.done()}') time.sleep(2.5) print(f'task1: {task1.done()}') print(f'task2: {task2.done()}') print(f'task3: {task3.done()}') print(task1.result()) # 通過result來獲取返回值

執(zhí)行結(jié)果如下:

task1: Falsetask2: Falsetask3: Falsecrawl task1 finishedcrawl task2 finishedtask1: Truetask2: Truetask3: False1crawl task3 finished

1.使用 with 語句 ,通過 ThreadPoolExecutor 構(gòu)造實(shí)例,同時(shí)傳入 max_workers 參數(shù)來設(shè)置線程池中最多能同時(shí)運(yùn)行的線程數(shù)目。

2.使用 submit 函數(shù)來提交線程需要執(zhí)行的任務(wù)到線程池中,并返回該任務(wù)的句柄(類似于文件、畫圖),注意 submit() 不是阻塞的,而是立即返回。

3.通過使用 done() 方法判斷該任務(wù)是否結(jié)束。上面的例子可以看出,提交任務(wù)后立即判斷任務(wù)狀態(tài),顯示四個(gè)任務(wù)都未完成。在延時(shí)2.5后,task1 和 task2 執(zhí)行完畢,task3 仍在執(zhí)行中。

4.使用 result() 方法可以獲取任務(wù)的返回值。

主要方法

wait

wait(fs, timeout=None, return_when=ALL_COMPLETED)

wait 接受三個(gè)參數(shù):fs: 表示需要執(zhí)行的序列timeout: 等待的最大時(shí)間,如果超過這個(gè)時(shí)間即使線程未執(zhí)行完成也將返回return_when:表示wait返回結(jié)果的條件,默認(rèn)為 ALL_COMPLETED 全部執(zhí)行完成再返回

還是用上面那個(gè)例子來熟悉用法示例:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETEDimport timedef spider(page): time.sleep(page) print(f'crawl task{page} finished') return pagewith ThreadPoolExecutor(max_workers=5) as t: all_task = [t.submit(spider, page) for page in range(1, 5)] wait(all_task, return_when=FIRST_COMPLETED) print(’finished’) print(wait(all_task, timeout=2.5))# 運(yùn)行結(jié)果crawl task1 finishedfinishedcrawl task2 finishedcrawl task3 finishedDoneAndNotDoneFutures(done={<Future at 0x28c8710 state=finished returned int>, <Future at 0x2c2bfd0 state=finished returned int>, <Future at 0x2c1b7f0 state=finished returned int>}, not_done={<Future at 0x2c3a240 state=running>})crawl task4 finished

1.代碼中返回的條件是:當(dāng)完成第一個(gè)任務(wù)的時(shí)候,就停止等待,繼續(xù)主線程任務(wù)

2.由于設(shè)置了延時(shí), 可以看到最后只有 task4 還在運(yùn)行中

as_completed

上面雖然提供了判斷任務(wù)是否結(jié)束的方法,但是不能在主線程中一直判斷啊。最好的方法是當(dāng)某個(gè)任務(wù)結(jié)束了,就給主線程返回結(jié)果,而不是一直判斷每個(gè)任務(wù)是否結(jié)束。ThreadPoolExecutorThreadPoolExecutor 中 的 as_completed() 就是這樣一個(gè)方法,當(dāng)子線程中的任務(wù)執(zhí)行完后,直接用 result() 獲取返回結(jié)果

用法如下:

# coding: utf-8from concurrent.futures import ThreadPoolExecutor, as_completedimport timedef spider(page): time.sleep(page) print(f'crawl task{page} finished') return pagedef main(): with ThreadPoolExecutor(max_workers=5) as t: obj_list = [] for page in range(1, 5): obj = t.submit(spider, page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(f'main: {data}')# 執(zhí)行結(jié)果crawl task1 finishedmain: 1crawl task2 finishedmain: 2crawl task3 finishedmain: 3crawl task4 finishedmain: 4

as_completed() 方法是一個(gè)生成器,在沒有任務(wù)完成的時(shí)候,會(huì)一直阻塞,除非設(shè)置了 timeout。

當(dāng)有某個(gè)任務(wù)完成的時(shí)候,會(huì) yield 這個(gè)任務(wù),就能執(zhí)行 for 循環(huán)下面的語句,然后繼續(xù)阻塞住,循環(huán)到所有的任務(wù)結(jié)束。同時(shí),先完成的任務(wù)會(huì)先返回給主線程。

map

map(fn, *iterables, timeout=None)

fn: 第一個(gè)參數(shù) fn 是需要線程執(zhí)行的函數(shù);iterables:第二個(gè)參數(shù)接受一個(gè)可迭代對(duì)象;timeout: 第三個(gè)參數(shù) timeout 跟 wait() 的 timeout 一樣,但由于 map 是返回線程執(zhí)行的結(jié)果,如果 timeout小于線程執(zhí)行時(shí)間會(huì)拋異常 TimeoutError。

用法如下:

import timefrom concurrent.futures import ThreadPoolExecutordef spider(page): time.sleep(page) return pagestart = time.time()executor = ThreadPoolExecutor(max_workers=4)i = 1for result in executor.map(spider, [2, 3, 1, 4]): print('task{}:{}'.format(i, result)) i += 1# 運(yùn)行結(jié)果task1:2task2:3task3:1task4:4

使用 map 方法,無需提前使用 submit 方法,map 方法與 python 高階函數(shù) map 的含義相同,都是將序列中的每個(gè)元素都執(zhí)行同一個(gè)函數(shù)。

上面的代碼對(duì)列表中的每個(gè)元素都執(zhí)行 spider() 函數(shù),并分配各線程池。

可以看到執(zhí)行結(jié)果與上面的 as_completed() 方法的結(jié)果不同,輸出順序和列表的順序相同,就算 1s 的任務(wù)先執(zhí)行完成,也會(huì)先打印前面提交的任務(wù)返回的結(jié)果。

多線程實(shí)戰(zhàn)

以某網(wǎng)站為例,演示線程池和單線程兩種方式爬取的差異

# coding: utf-8import requestsfrom concurrent.futures import ThreadPoolExecutor, as_completedimport timeimport jsonfrom requests import adaptersfrom proxy import get_proxiesheaders = { 'Host': 'splcgk.court.gov.cn', 'Origin': 'https://splcgk.court.gov.cn', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36', 'Referer': 'https://splcgk.court.gov.cn/gzfwww/ktgg',}url = 'https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1'def spider(page): data = { 'bt': '', 'fydw': '', 'pageNum': page, } for _ in range(5): try: response = requests.post(url, headers=headers, data=data, proxies=get_proxies()) json_data = response.json() except (json.JSONDecodeError, adapters.SSLError): continue else: break else: return {} return json_datadef main(): with ThreadPoolExecutor(max_workers=10) as t: obj_list = [] begin = time.time() for page in range(1, 15): obj = t.submit(spider, page) obj_list.append(obj) for future in as_completed(obj_list): data = future.result() print(data) print(’*’ * 50) times = time.time() - begin print(times)if __name__ == '__main__': main()

運(yùn)行結(jié)果:

python線程池 ThreadPoolExecutor 的用法示例

單線程實(shí)戰(zhàn)

下面我們可以使用單線程來爬取,代碼基本和上面的一樣,加個(gè)單線程函數(shù)代碼如下:

# coding: utf-8import requestsfrom concurrent.futures import ThreadPoolExecutor, as_completedimport timeimport jsonfrom requests import adaptersfrom proxy import get_proxiesheaders = { 'Host': 'splcgk.court.gov.cn', 'Origin': 'https://splcgk.court.gov.cn', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36', 'Referer': 'https://splcgk.court.gov.cn/gzfwww/ktgg',}url = 'https://splcgk.court.gov.cn/gzfwww/ktgglist?pageNo=1'def spider(page): data = { 'bt': '', 'fydw': '', 'pageNum': page, } for _ in range(5): try: response = requests.post(url, headers=headers, data=data, proxies=get_proxies()) json_data = response.json() except (json.JSONDecodeError, adapters.SSLError): continue else: break else: return {} return json_datadef single(): begin = time.time() for page in range(1, 15): data = spider(page) print(data) print(’*’ * 50) times = time.time() - begin print(times)if __name__ == '__main__': single()

運(yùn)行結(jié)果:

python線程池 ThreadPoolExecutor 的用法示例

可以看到,總共花了 19 秒。真是肉眼可見的差距啊!如果數(shù)據(jù)量大的話,運(yùn)行時(shí)間差距會(huì)更大!

以上就是python線程池 ThreadPoolExecutor 的用法示例的詳細(xì)內(nèi)容,更多關(guān)于python線程池 ThreadPoolExecutor 的用法及實(shí)戰(zhàn)的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!

標(biāo)簽: Python 編程
相關(guān)文章:
主站蜘蛛池模板: 国产三级在线免费观看 | 一级一级一片免费高清 | 亚洲毛片一级巨乳 | 精品久久成人 | 精品久久久久国产免费 | 黄a视频 | 超级碰碰碰视频视频在线视频 | 成人欧美日韩 | 91久久福利国产成人精品 | 超91精品手机国产在线 | 亚洲精品一区二区不卡 | 亚洲www色| 九九国产视频 | 精品欧美一区二区三区在线观看 | 欧美日韩精品在线视频 | 欧美人成人亚洲专区中文字幕 | 国产日本欧美亚洲精品视 | 国产精品观看 | 亚洲精品片 | 美女视频免费黄的 | 96精品视频在线播放免费观看 | 欧美视频免费一区二区三区 | 国产高清精品毛片基地 | 一级片欧美 | 26uuu影院亚洲欧美综合 | 亚洲精品国产一区二区在线 | 精品毛片视频 | 久色tv| 伊人婷婷色香五月综合缴激情 | 免费男女视频 | 亚洲精品国产成人专区 | 99精品在线看 | 日韩在线看片中文字幕不卡 | 国产免费久久精品99re丫y | 精品久久久久久乐 | 欧美男女网站 | 国产欧美另类久久久品 | 明星国产欧美日韩在线观看 | 国产一级久久免费特黄 | 欧美 亚洲 中文字幕 | 亚洲日本在线观看 |