Preface
在工作中多少都會涉及到一些定時任務,比如定時郵件提醒等.本文通過開源項目 schedule 來學習定時任務調度是如何工作的,以及基於此實現一個 web 版本的提醒工具.
schedule 簡介
既然 schedule 說是給人類使用的作業調度器, 先來看看作者給提供的例子:
- import schedule
- import time
- def job():
- print("I'm working...")
- schedule.every(10).minutes.do(job)
- schedule.every().hour.do(job)
- schedule.every().day.at("10:30").do(job)
- schedule.every().monday.do(job)
- schedule.every().wednesday.at("13:15").do(job)
- while True:
- schedule.run_pending()
- time.sleep(1)
上面的意思就是:
schedule 源碼學習
首先看一下有哪些類, 如圖 ( 使用 pycharm 導出的 ):
可以看到只有三個類,源碼分析就圍繞這三個類:
Class CancelJob
- class CancelJob(object):
- pass
Class Scheduler
為了使代碼緊湊,這裡刪除了註釋,剩下也就 34 行代碼:
- class Scheduler(object):
- """
- Objects instantiated by the :class:`Scheduler
` are - factories to create jobs, keep record of scheduled jobs and
- handle their execution.
- """
- def __init__(self):
- self.jobs = []
- def run_pending(self):
- runnable_jobs = (job for job in self.jobs if job.should_run)
- for job in sorted(runnable_jobs):
- self._run_job(job)
- def run_all(self, delay_seconds=0):
- logger.info('Running *all* %i jobs with %is delay inbetween',
- len(self.jobs), delay_seconds)
- for job in self.jobs[:]:
- self._run_job(job)
- time.sleep(delay_seconds)
- def clear(self, tag=None):
- if tag is None:
- del self.jobs[:]
- else:
- self.jobs[:] = (job for job in self.jobs if tag not in job.tags)
- def cancel_job(self, job):
- try:
- self.jobs.remove(job)
- except ValueError:
- pass
- def every(self, interval=1):
- job = Job(interval, self)
- return job
- def _run_job(self, job):
- ret = job.run()
- if isinstance(ret, CancelJob) or ret is CancelJob:
- self.cancel_job(job)
- @property
- def next_run(self):
- if not self.jobs:
- return None
- return min(self.jobs).next_run
- @property
- def idle_seconds(self):
- return (self.next_run - datetime.datetime.now()).total_seconds()
Class Job
Job 是整個定時任務的核心. 主要功能就是根據創建 Job 時的參數, 得到下一次運行的時間. 代碼如下,稍微有點長 (會省略部分代碼,可以看 源碼). 這個類別提供的ˊ方法也不是很多, 有很多邏輯是一樣的. 簡單介紹一下建構子的參數:
再來看一下幾個重要的方法:
* __lt__:
* second、seconds:
* monday:
* at:
* do:
* should_run:
* _schedule_next_run:
Real User Cases
這邊介紹實際使用範例.
在 N 小時/分鐘 後執行並只一次
這個範例很像 Linux 命令 at 的功能, 簡單來說就是延遲一段時間後再執行某個 job. 這邊我們會繼承 Job 並客製成我們需要的功能 MyJob 類別:
- test_run_after.py
- #!/usr/bin/env python3
- import schedule
- import logging
- import functools
- import os
- import re
- import time
- from schedule import Job, CancelJob, IntervalError
- from datetime import datetime, timedelta
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(os.path.basename(__file__))
- logger.setLevel(20)
- class MyJob(Job):
- def __init__(self, scheduler=None):
- super(MyJob, self).__init__(1, scheduler)
- self.regex = re.compile(r'((?P
\d+?)hr)?((?P )\d+?)m)?((?P \d+?)s)?' - def parse_time(self, time_str):
- # https://stackoverflow.com/questions/4628122/how-to-construct-a-timedelta-object-from-a-simple-string
- parts = self.regex.match(time_str)
- if not parts:
- raise IntervalError()
- parts = parts.groupdict()
- time_params = {}
- for (name, param) in parts.items():
- if param:
- time_params[name] = int(param)
- return timedelta(**time_params)
- def do(self, job_func, *args, **kwargs):
- self.job_func = functools.partial(job_func, *args, **kwargs)
- try:
- functools.update_wrapper(self.job_func, job_func)
- except AttributeError:
- # job_funcs already wrapped by functools.partial won't have
- # __name__, __module__ or __doc__ and the update_wrapper()
- # call will fail.
- pass
- self.scheduler.jobs.append(self)
- return self
- def after(self, atime):
- if isinstance(atime, timedelta):
- self.next_run = datetime.now() + atime
- elif isinstance(atime, str):
- times = atime.split(':')
- if len(times) == 3: # HH:MM:SS
- self.next_run = datetime.now() + timedelta(hours=int(times[0]), minutes=int(times[1]), seconds=int(times[2]))
- else:
- self.next_run = datetime.now() + self.parse_time(atime)
- else:
- raise IntervalError()
- return self
- def run(self):
- logger.info('Running job %s', self)
- ret = self.job_func()
- self.last_run = datetime.now()
- return CancelJob()
- def main():
- def work():
- logger.info('Work done at {}'.format(datetime.now()))
- myjob = MyJob(schedule.default_scheduler)
- myjob.after('2m').do(work) # Do work after 2 minutes
- logger.info('Now is {}'.format(datetime.now()))
- while len(schedule.default_scheduler.jobs) > 0:
- schedule.run_pending()
- time.sleep(1)
- logger.info('All job done!')
- if __name__ == '__main__':
- main()
Supplement
* 鳥哥私房菜 - 第十五章、例行性工作排程(crontab)
沒有留言:
張貼留言