国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術文章
文章詳情頁

5分鐘快速掌握Python定時任務框架的實現

瀏覽:31日期:2022-06-29 10:20:20
APScheduler 簡介

在實際開發中我們經常會碰上一些重復性或周期性的任務,比如像每天定時爬取某個網站的數據、一定周期定時運行代碼訓練模型等,類似這類的任務通常需要我們手動來進行設定或調度,以便其能夠在我們設定好的時間內運行。

在 Windows 上我們可以通過計劃任務來手動實現,而在 Linux 系統上往往我們會用到更多關于 crontab 的相關操作。但手動管理并不是一個很好的選擇,如果我們需要有十幾個不同的定時任務需要管理,那么每次通過人工來進行干預未免有些笨拙,那這時候就真的是「人工智能」了。

所以將這些定時任務的調度代碼化才是能夠讓我們很好地從這種手動管理的純人力操作中解脫出來。

在 Python 生態中對于定時任務的一些操作主要有那么幾個:

schedule:第三方模塊,該模塊適合比較輕量級的一些調度任務,但卻不適用于復雜時間的調度 APScheduler:第三方定時任務框架,是對 Java 第三方定時任務框架 Quartz 的模仿與移植,能提供比 schedule 更復雜的應用場景,并且各種組件都是模塊化,易于使用與二次開發。 Celery Beat:屬于 celery 這分布式任務隊列第三方庫下的一個定時任務組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊列套件,需要花費一定的時間在環境搭建上,但在高版本中已經不支持 Windows。

所以為了滿足能夠相對復雜的時間條件,又不需要在前期的環境搭建上花費很多時間的前提下,選擇 APScheduler 來對我們的調度任務或定時任務進行管理是個性價比極高的選擇。而本文主要會帶你快速上手有關 APScheduler 的使用。

APScheduler 概念與組件

雖然說官方文檔上的內容不是很多,而且所列舉的 API 不是很多,但這側面也反映了這一框架的簡單易用。所以在使用 APScheduler 之前,我們需要對這個框架的一些概念簡單了解,主要有那么以下幾個:

觸發器(trigger) 任務持久化(job stores) 執行器(executor) 調度器(scheduler)觸發器(trigger)

所謂的觸發器就是用以觸發定時任務的組件,在 APScheduler 中主要是指時間觸發器,并且主要有三類時間觸發器可供使用:

date:日期觸發器。日期觸發器主要是在某一日期時間點上運行任務時調用,是 APScheduler 里面最簡單的一種觸發器。所以通常也適用于一次性的任務或作業調度。 interval:間隔觸發器。間隔觸發器是在日期觸發器基礎上擴展了對時間部分,比如時、分、秒、天、周這幾個部分的設定。是我們用以對重復性任務進行設定或調度的一個常用調度器。設定了時間部分之后,從起始日期開始(默認是當前)會按照設定的時間去執行任務。 cron:cron 表達式觸發器。cron 表達式觸發器就等價于我們 Linux 上的 crontab,它主要用于更復雜的日期時間進行設定。但需要注意的是,APScheduler 不支持 6 位及以上的 cron 表達式,最多只支持到 5 位。任務持久化(job stores)

任務持久化主要是用于將設定好的調度任務進行存儲,即便是程序因為意外情況,如斷電、電腦或服務器重啟時,只要重新運行程序時,APScheduler 就會根據對存儲好的調度任務結果進行判斷,如果出現已經過期但未執行的情況會進行相應的操作。

APScheduler 為我們提供了多種持久化任務的途徑,默認是使用 memory 也就是內存的形式,但內存并不是持久化最好的方式。最好的方式則是通過像數據庫這樣的載體來將我們的定時任務寫入到磁盤當中,只要磁盤沒有損壞就能將數據給恢復。

APScheduler 支持的且常用的數據庫主要有:

sqlalchemy 形式的數據庫,這里就主要是指各種傳統的關系型數據庫,如 MySQL、PostgreSQL、SQLite 等。 mongodb 非結構化的 Mongodb 數據庫,該類型數據庫經常用于對非結構化或版結構化數據的存儲或操作,如 JSON。 redis 內存數據庫,通常用作數據緩存來使用,當然通過一些主從復制等方式也能實現當中數據的持久化或保存。

通常我們可以在創建 Scheduler 實例時創建,或是單獨為任務指定。配置的方式相對簡單,我們只需要指定對應的數據庫鏈接即可。

執行器(executor)

執行器顧名思義就是執行我們任務的對象,在計算機內通常要么是 CPU 調度任務,要么是單獨維護一個線程來運行任務。所以 APScheduler 里的執行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 這樣的線程池和進程池兩種。

當然如果是和協程或異步相關的任務調度,還可以使用對應的 AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三種執行器。

調度器(scheduler)

調度器的選擇主要取決于你當前的程序環境以及 APScheduler 的用途。根據用途的不同,APScheduler 又提供了以下幾種調度器:

BlockingScheduler:阻塞調度器,當程序中沒有任何存在主進程之中運行東西時,就則使用該調度器。 BackgroundScheduler:后臺調度器,在不使用后面任何的調度器且希望在應用程序內部運行時的后臺啟動時才進行使用,如當前你已經開啟了一個 Django 或 Flask 服務。 AsyncIOScheduler:AsyncIO 調度器,如果代碼是通過 asyncio 模塊進行異步操作,使用該調度器。 GeventScheduler:Gevent 調度器,如果代碼是通過 gevent 模塊進行協程操作,使用該調度器 TornadoScheduler:Tornado 調度器,在 Tornado 框架中使用 TwistedScheduler:Twisted 調度器,在基于 Twisted 的框架或應用程序中使用 QtScheduler:Qt 調度器,在構建 Qt 應用中進行使用。

通常情況下如果不是和 Web 項目或應用集成共存,那么往往都首選 BlockingScheduler 調度器來進行操作,它會在當前進程中啟動相應的線程來進行任務調度與處理;反之,如果是和 Web 項目或應用共存,那么需要選擇 BackgroundScheduler 調度器,因為它不會干擾當前應用的線程或進程狀況。

基于對以上的概念和組件認識,我們就能基本上摸清 APScheduler 的運行流程:

設定調度器(scheduler)用以對任務的調度與安排進行全局統籌 對相應的函數或方法上設定相應的觸發器(trigger),并添加到調度器中 如有任務持久化(job stores)需要則需要設定對應的持久化層,否則默認使用內存存儲任務 當觸發器被觸發時,就將任務交由執行器(executor)進行執行APScheduler 快速上手

雖然 APScheduler 里面的概念和組件看起來有點多,但在使用上并不算很復雜,我們可以通過本節的示例就能夠很快使用。

選擇對應的 scheduler

在使用之前我們需要先實例化一個 scheduler 對象,所有的 scheduler 對象都被放在了 apscheduler.schedulers 模塊下,我們可以直接通過查看 API 文檔或者借助 IDE 補全的提示來獲取相應的 scheduler 對象。

這里我直接選取了最基礎的 BlockingScheduler:

# main.py from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()

配置 scheduler

對于 scheduler 的一些配置我們可以直接在實例化對象時就進行配置,當然也可以在創建實例化對象之后再進行配置。

實例化時進行參數配置:

# main.pyfrom datetime import datetime from apscheduler.executors.pool import ThreadPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.schedulers.blocking import BlockingScheduler # 任務持久化 使用 SQLitejobstores = { ’default’: SQLAlchemyJobStore(url = ’sqlite:///jobs.db’)}# 執行器配置executors = { ’default’: ThreadPoolExecutor(20),}# 關于 Job 的相關配置,見官方文檔 APIjob_defaults = { ’coalesce’: False, ’next_run_time’: datetime.now()}scheduler = BlockingScheduler( jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = ’Asia/Shanghai’)

或是通過 scheduler.configure 方法進行同樣的操作:

scheduler = BlockingScheduler()scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=’Asia/Shanghai’)添加并執行你的任務

創建 scheduler 對象之后,我們需要調用其下的 add_job() 或是 scheduled_job() 方法來將我們需要執行的函數進行注冊。前者是以傳參的形式指定對應的函數名,而后者則是以裝飾器的形式直接對我們要執行的函數進行修飾。

比如我現在有一個輸出此時此刻時間的函數 now():

from datetime import datetime def now(trigger): print(f'trigger:{trigger} -> {datetime.now()}')

然后我打算每 5 秒的時候運行一次,那我們使用 add_job() 可以這樣寫:

if __name__ == ’__main__’: scheduler.add_job(now, trigger = 'interval', args = ('interval',), seconds = 5) scheduler.start()

在調用 start() 方法之后調度器就會開始執行,并在控制臺上看到對應的結果了:

trigger:interval -> 2021-01-16 21:19:43.356674trigger:interval -> 2021-01-16 21:19:46.679849trigger:interval -> 2021-01-16 21:19:48.356595

當然使用 @scheduled_job 的方式來裝飾我們的任務或許會更加自由一些,于是上面的例子就可以寫成這樣:

@scheduler.scheduled_job(trigger = 'interval', args = ('interval',), seconds = 5)def now(trigger): print(f'trigger:{trigger} -> {datetime.now()}') if __name__ == ’__main__’: scheduler.start()

運行之后就會在控制臺看到同樣的結果了。

不過需要注意的是,添加任務一定要在 start() 方法執行前調用,否則會找不到任務或是拋出異常。

將 APScheduler 集成到 Web 項目中

如果你是正在做有關的 Web 項目且存在一些定時任務,那么得益于 APScheduler 由于多樣的調度器,我們能夠將其和我們的項目結合到一起。

如果你正在使用 Flask,那么 Flask-APScheduler 這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關的文檔,但只要你了解了前面我所介紹的有關于 APScheduler 的概念和組件,你就能很輕易地看懂這個第三方庫倉庫里的示例代碼。

如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些對任務或作業的增刪改查操作,我們可以自己編寫一套合適的 API。

這里我使用的是 FastAPI 這一目前流行的 Web 框架。demo 項目結構如下:

temp-scheduler├── config.py # 配置項├── main.py # API 文件└── scheduler.py # APScheduler 相關設置安裝依賴

這里我們需要的依賴不多,只需要簡單幾個即可:

pip install fastapi apscheduler sqlalchemy uvicorn配置項

如果項目中模塊過多,那么使用一個文件或模塊來進行統一管理是最好的選擇。這里的 config.py 我們主要像 Flask 的配置那樣簡單設定:

from apscheduler.executors.pool import ThreadPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.schedulers.blocking import BlockingScheduler class SchedulerConfig: JOBSTORES = {'default': SQLAlchemyJobStore(url='sqlite:///job.db')} EXECUTORS = {'default': ThreadPoolExecutor(20)} JOB_DEFAULTS = {'coalesce': False} @classmethod def to_dict(cls): return { 'jobstores': cls.JOBSTORES, 'executors': cls.EXECUTORS, 'job_defaults': cls.JOB_DEFAULTS, }

在 SchedulerConfig 配置項中我們可以自己實現一個 to_dict() 類方法,以便我們后續傳參時通過解包的方式直接傳入配置參數即可。

Scheduler 相關設置

scheduler.py 模塊的設定也比較簡單,即設定對應的 scheduler 調度器即可。由于是演示 demo 我還將要定期執行的任務也放在了這個模塊當中:

import loggingfrom datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from config import SchedulerConfig scheduler = BackgroundScheduler()logger = logging.getLogger(__name__) def init_scheduler() -> None: # config scheduler scheduler.configure(**SchedulerConfig.to_dict()) logger.info('scheduler is running...') # schedule test scheduler.add_job( func=mytask, trigger='date', args=('APScheduler Initialize.',), next_run_time=datetime.now(), ) scheduler.start() def mytask(message: str) -> None: print(f'[{datetime.now()}] message: {message}')

在這一部分中:

init_scheduler() 方法主要用于在 API 服務啟動時被調用,然后對 scheduler 對象的配置以及測試 mytask() 則是我們要定期執行的任務,后續我們可以通過 APScheduler 提供的方法來自行添加任務API 設置

在 main.py 模塊就主要存放著我們由 FastAPI 所構建的相關 API。如果在后續開發時存在多個接口,此時就需要將不同接口放在不同模塊文件中,以達到路由的分發與管理,類似于 Flask 的藍圖模式。

import loggingimport uuidfrom datetime import datetimefrom typing import Any, Dict, Optional, Sequence, Union from fastapi import FastAPIfrom pydantic import BaseModel from scheduler import init_scheduler, mytask, scheduler logger = logging.getLogger(__name__) app = FastAPI(title='APScheduler API')app.add_event_handler('startup', init_scheduler) class Job(BaseModel): id: Union[int, str, uuid.UUID] name: Optional[str] = None func: Optional[str] = None args: Optional[Sequence[Optional[str]]] = None kwargs: Optional[Dict[str, Any]] = None executor: Optional[str] = None misfire_grace_time: Optional[str] = None coalesce: Optional[bool] = None max_instances: Optional[int] = None next_run_time: Optional[Union[str, datetime]] = None @app.post('/add')def add_job( message: str, trigger: str, trigger_args: Optional[dict], id: Union[str, int, uuid.UUID],): try: scheduler.add_job( func=mytask, trigger=trigger, kwargs={'message': message}, id=id, **trigger_args, ) except Exception as e: logger.exception(e.args) return {'status_code': 0, 'message': '添加失敗'} return {'status_code': 1, 'message': '添加成功'} @app.delete('/delete/{id}')def delete_job(id: Union[str, int, uuid.UUID]): '''delete exist job by id''' try: scheduler.remove_job(job_id=id) except Exception: return dict( message='刪除失敗', status_code=0, ) return dict( message='刪除成功', status_code=1, ) @app.put('/reschedule/{id}')def reschedule_job( id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]): try: scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args) except Exception as e: logger.exception(e.args) return dict( message='修改失敗', status_code=0, ) return dict( message='修改成功', status_code=1, ) @app.get('/job')def get_all_jobs(): jobs = None try: job_list = scheduler.get_jobs() if job_list: jobs = [Job(**task.__getstate__()) for task in job_list] except Exception as e: logger.exception(e.args) return dict( message='查詢失敗', status_code=0, jobs=jobs, ) return dict( message='查詢成功', status_code=1, jobs=jobs, ) @app.get('/job/{id}')def get_job_by_id(id: Union[int, str, uuid.UUID]): jobs = [] try: job = scheduler.get_job(job_id=id) if job: jobs = [Job(**job.__getstate__())] except Exception as e: logger.exception(e.args) return dict( message='查詢失敗', status_code=0, jobs=jobs, ) return dict( message='查詢成功', status_code=1, jobs=jobs, )

以上代碼看起來很多,其實核心的就那么幾點:

FastAPI 對象 app 的初始化。這里用到的 add_event_handler() 方法就有點像 Flask 中的 before_first_request,會在 Web 服務請求伊始進行操作,理解為初始化相關的操作即可。

API 接口路由。路由通過 app 對象下的對應 HTTP 方法來實現,如 GET、POST、PUT 等。這里的裝飾器用法其實也和 Flask 很類似,就不多贅述。

scheduler 對象的增刪改查。從 scheduler.py 模塊中引入我們創建好的 scheduler 對象之后就可以直接用來做增刪改查的操作:

增:使用 add_job() 方法,其主要的參數是要運行的函數(或方法)、觸發器以及觸發器參數等 刪:使用 delete_job() 方法,我們需要傳入一個對應任務的 id 參數,用以能夠查找到對應的任務 改:使用 reschedule_job() 方法,這里也需要一個對應任務的 id 參數,以及需要重新修改的觸發器及其參數 查:使用 get_jobs() 和 get_job() 兩個方法,前者是直接獲取到當前調度的所有任務,返回的是一個包含了 APScheduler.job.Job 對象的列表,而后者是通過 id 參數來查找對應的任務對象;這里我通過底層源碼使用 __getstate__() 來獲取到任務的相關信息,這些信息我們通過事先設定好的 Job 對象來對其進行序列化,最后將信息從接口中返回。運行

完成以上的所有操作之后,我們就可以打開控制臺,進入到該目錄下并激活我們的虛擬環境,之后運行:

uvicorn main:app

之后我們就能在 FastAPI 默認的地址 http://127.0.0.1:8000/docs 中看到關于全部接口的 Swagger 文檔頁面了:

5分鐘快速掌握Python定時任務框架的實現

fastapi 集成的 swagger 頁面

之后我們可以直接在文檔里面或使用 Postman 來自己進行接口測試即可。

結尾

本文介紹了有關于 APScheduler 框架的概念及其用法,并進行了簡單的實踐。

得益于 APScheduler 的模塊化設計才可以讓我們更方便地去理解、使用它,并將其運用到我們實際的開發過程中。

從 APScheduler 目前的 Github 倉庫代碼以及 issue 來看,作者已經在開始重構 4.0 版本,當中的一些源代碼和 API 也有較大的變動,相信在 4.0 版本中將會引入更多的新特性。

但如果現階段你正打算使用或已經使用 APScheduler 用于實際生產中,那么希望本文能對會你有所幫助。

到此這篇關于5分鐘快速掌握Python定時任務框架的實現的文章就介紹到這了,更多相關Python 定時任務內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Python 編程
相關文章:
主站蜘蛛池模板: 美女双腿打开让男人桶爽网站 | 女人张开腿让男人桶免费最新 | 午夜男人女人爽爽爽视频 | 久久欧美成人精品丝袜 | 亚洲欧美日韩精品久久 | 18性欧美69| 免费国产99久久久香蕉 | 国产午夜久久影院 | 亚洲成在人 | 国产真真人女人特级毛片 | 国产欧美日韩在线观看一区二区三区 | 欧美色性 | 久久综合狠狠综合久久97色 | 亚洲高清在线观看看片 | 国产精品久久久久久久久久久威 | aaaaaaa一级毛片 | 精品国产一区在线观看 | 久久视频6免费观看视频精品 | 久久久香蕉 | 成年人免费在线视频观看 | 99国产福利视频在线观看 | 色妇女影院 | 久草在现 | 在线视频亚洲一区 | 中文字幕无线精品乱码一区 | 国产日韩精品在线 | 最新国产三级在线观看不卡 | 国产精品毛片一区 | 在线视频中文字幕 | 最新在线步兵区 | 一区二区三区四区视频 | 狼人激情网 | 日本aaaa级 | 亚洲高清国产一线久久 | 成年女人毛片 | 久久中文字幕在线观看 | 夜色毛片永久免费 | 韩国美女毛片 | 国产精品亚洲欧美云霸高清 | 国产高清精品一级毛片 | 精品99久久|