2021年2月19日 星期五

[ Python 文章收集 ] Scheduling All Kinds of Recurring Jobs with Python

 Source From Here

Preface
Let’s explore all the libraries for running cron jobs, deferred tasks, recurring tasks or any other scheduled jobs in Python

With Python there’s always a lot of libraries and options for solving any particular problem and running scheduled or recurring jobs is no exception. Whether you want to run simple deferred task, bunch of scheduled jobs or manage cron tabs, there’s specialized library for that in Python. So, in this article, I will give you an overview of all the options available to help you choose the right tool for the task at hand, as well as their use cases, including intro and basic examples to get you started quickly!

The Builtin Solution
Before exploring any external libraries, let’s first check what we have in Pythons standard library. Most of the time, Python standard library will contain the solution to whatever problem you might have and if the problem is running deferred jobs like with Linux at command, then grabbing sched module might be the way to go.

sched is a very simple module, which can be used to schedule one-off tasks for some specified time - so, it's important to realize, that this is not the recurring job (like cron job). It works on all platforms, which might seem obvious, but will not necessarily be the case with all the libraries shown later.

One of the use cases for such deferred tasks can be scheduled shutdown or if you are working with network connections or firewall you can create a one-time job to revert changes in case you mess up and lock yourself out of the system.

Enough talking, let’s see an example:
  1. import sched  
  2. import threading  
  3. import time  
  4. from datetime import datetime  
  5.   
  6. scheduler = sched.scheduler(time.time, time.sleep)  
  7.   
  8. def some_deferred_task(name):  
  9.     print(f'Event time: {datetime.now()} ({name})')  
  10.   
  11. print(f'Start: {datetime.now()}')  
  12.   
  13. now = time.time()  
  14. #      delay in seconds -----v  v----- priority  
  15. event_1_id = scheduler.enter(22, some_deferred_task, ('first',))  
  16.   
  17. # If first 2 events run at the exact same time, then "second" is ran first  
  18. event_2_id = scheduler.enter(21, some_deferred_task, ('second',))  
  19. event_3_id = scheduler.enter(51, some_deferred_task, ('third',))  
  20.   
  21. # Start a thread to run the events  
  22. t = threading.Thread(target=scheduler.run)  
  23. t.start()  
  24.   
  25. # Event has to be canceled in main thread  
  26. scheduler.cancel(event_2_id)  
  27.   
  28. # Terminate the thread when tasks finish  
  29. t.join()  
Execution:
# python test.py
Start: 2021-02-18 20:53:59.191868
Event time: 2021-02-18 20:54:01.204007 (first)
Event time: 2021-02-18 20:54:04.197989 (third)

The code above defines scheduler, which is used to create (.enter) events to be executed at a later time. Each event (call to .enter) receives 4 arguments, which are:
* delay in seconds ( in how many seconds will the event happen?)
* priority: Smaller with higher priority
* name of the function to be called
* optional function arguments.

The priority argument doesn't matter most of the time but can be very important if 2 events are scheduled to happen at exactly the same time, yet they have to be executed sequentially. In that case, the event with highest priority (lowest number) goes first.

In this code snippet we can also see that .enter method returns event ID. These IDs can be used to cancel events as demonstrated with scheduler.cancel(event_2_id).

To not block the main thread of the program, we also used threading.Thread to start the scheduler and called .join() on it to gracefully terminate when it's done with all the tasks.

Full Power of Crontab
There’s quite a few libraries for running recurring jobs using Python, but let’s start with the one that gives you the full cron “experience”. This library is called python-crontab and can be installed with pip install python-crontab.

python-crontab, unlike other libraries and modules listed here, creates and manages actual real crontabs on Unix systems and tasks on Windows. Therefore, it's not emulating behavior of these operating system tools, but rather leveraging them and using what's already there.

For an example here, let’s see some practical use case. Common reason for running recurring tasks can be checking of the status of the database server. This can be generally done by connecting to and logging into database and running dummy query like SELECT 1, just like so:
  1. from crontab import CronTab  
  2.   
  3. # user=True denotes the current user  
  4. cron = CronTab(user=True)  
  5. job = cron.new(command='PGPASSWORD=test psql -U someuser -d somedb -c "SELECT 1" -h localhost')  
  6. job.setall("*/5 * * * *")  
  7.   
  8. if cron[0].is_valid():  # If syntax is valid, write to crontab  
  9.     cron.write()  
You can check the registered crontab job by:
$ crontab -l # Check real crontab from shell
*/5 * * * * PGPASSWORD=test psql -U someuser -d somedb -c "SELECT 1" -h localhost

As I previously mentioned, python-crontab provides the real cron "experience", which includes the generally disliked cron syntax. To set the schedule, one uses .setall method to set all the fields. Before setting the schedule however, we need to create the crontab using CronTab() and specify the owning user. If True is passed in, ID of user executing the program will be used. We also have to create an individual job (.new()) in this crontab passing in command to be executed and optionally also a comment.

When we have the crontab and its job ready we need to write it, but it’s a good idea to check its syntax using .is_valid() before we do so.

Another basic database admin task is creation of periodic backups, that can be also done easily with python-crontab, this time with little different syntax:
  1. with CronTab(user='root') as cron:  # with context manager cron.write() is called automatically  
  2.     job = cron.new(  
  3.         command='PGPASSWORD=test pg_dump -U someuser -d somedb -h localhost --column-inserts --data-only > backup.sql',  
  4.         comment="Perform database backup"  
  5.     )  
  6.     job.every(2).days()  
  7.     job.hour.on(1)  
  8.   
  9.     # job.every_reboot()  
  10.     # job.hour.every(10)  
  11.     # job.month.during('JAN''FEB')  # Powerful but confusing/hard to parse syntax  
  12.     # job.minute.during(1545).every(5)  
  13.   
  14. # crontab -l  
  15. 0 1 */2 * * PGPASSWORD=test pg_dump -U someuser -d somedb -h localhost --column-inserts --data-only > backup.sql # Perform database backup  
If you’re not super comfortable with cron syntax, this library also provides declarative syntax, which is shown in the example above. This syntax is in my opinion very confusing and even harder to read and use than normal cron syntax, so I’d recommend sticking with cron syntax or choose a different library (see next section).

Apart from different syntax we can also see the usage of Python context manager, which allows us to omit the .write method shown previously. One more thing to keep in mind is, that if you decide to run cron jobs as root user (not recommended), as shown above, then you will have to run the program with sudo.

This library has also other useful features apart from the basic creation and management of crontabs. One of them being listing and inspecting both user and system crontabs, as well as lookup based on criteria like command or comment of the specific job:
  1. from crontabs import CronTabs  
  2.   
  3. for cron in CronTabs():  # Get list of all user and system crontabs  
  4.     if cron.user:  
  5.         print(f'{cron.user} has following cron jobs:')  
  6.     else:  
  7.         print(f'{cron.filen} has following cron jobs:')  
  8.     for job in cron.crons:  
  9.         print(f'   {job.command}')  
  10.   
  11. # martin has following cron jobs:  
  12. #    PGPASSWORD=test psql -U someuser -d somedb -c "SELECT 1" -h localhost  
  13. #    PGPASSWORD=test pg_dump -U someuser -d somedb -h localhost --column-inserts --data-only > backup.sql  
  14. # /etc/cron.d/anacron has following cron jobs:  
  15. #    [ -x /etc/init.d/anacron ] && if [ ! -d /run/systemd/system ]; then /usr/sbin/invoke-rc.d anacron start >/dev/null; fi  
  16. # /etc/cron.d/popularity-contest has following cron jobs:  
  17. #    test -x /etc/cron.daily/popularity-contest && /etc/cron.daily/popularity-contest --crond  
  18. # ...  
  19.   
  20. jobs = CronTabs().all.find_command('psql')  # lookup for all jobs running specific command  
  21.   
  22. for job in jobs:  
  23.     print(job)  
  24.   
  25. # */5 * * * * PGPASSWORD=test psql -U someuser -d somedb -c "SELECT 1" -h localhost  
As I mentioned in the previous section, not all libraries shown here work exactly the same way on all platforms. python-crontab works on Linux and Windows, but on Windows only user crontabs (Windows tasks) are supported.

If You Really Hate Cron Syntax
We’ve seen how to schedule job with declarative syntax with python-crontab in the previous section, but it wasn't really readable or user friendly. If you're looking for the most user friendly, most popular library with a very simple interface, then schedule is library for you.

schedule is based on an article Rethinking Cron which describes some of the cron problems and weaknesses and this library does a good job at solving them. The biggest complaint with cron is definitely its terse and hard to write syntax, so let’s see how schedule addresses that:
  1. import schedule  
  2.   
  3. def task():  
  4.     return ...  
  5.   
  6. def task_with_args(value):  
  7.     return ...  
  8.   
  9. schedule.every(2).hours.do(task)  
  10. schedule.every().sunday.at("01:00").do(task)  
  11. schedule.every().hour.at(":15").do(task)  
  12. schedule.every(15).to(30).seconds.do(task)  # Randomly between every 15 to 30 seconds  
  13. schedule.every().minute.do(task_with_args, "some value")  
  14.   
  15. # Grouping jobs with tags  
  16. schedule.every().day.at("09:00").do(task).tag('daily''morning')  
  17. schedule.every().day.at("18:30").do(task).tag('daily''evening')  
  18. schedule.every().hour.do(task).tag('hourly')  
  19.   
  20. schedule.clear('daily-tasks')  
  21.   
  22. # No explicit "month" schedule  
  23. # No explicit "range" (during 10-14h; from Jan to Mar) schedule  
The first 5 scheduled jobs above don’t really need much of an explanation. The code is very human-readable a quite self-explanatory. The interface only contains a few function for days (.monday()) and times (.seconds(), .hours(), ...), which makes it very easy to use (more).

Apart from the simple scheduling, the interface contains also .tag() method for grouping the jobs by tag. This can be useful for example for canceling whole groups of jobs (with .clear()).

One downside of having such simple interface is the lack of explicit month or range scheduling, e.g. scheduling jobs during 10–14h or from Jan to Mar isn’t really possible.

Aside from recurring jobs, you can also use schedule to run one-off tasks and achieve the same effect as with sched, but with nicer syntax:
  1. # Execute one-off (deferred job)  
  2. def deferred_job():  
  3.     # Do stuff...  
  4.     return schedule.CancelJob  
  5.   
  6.   
  7. schedule.every().day.at('01:00').do(deferred_job)  
  8.   
  9. while True:  
  10.     schedule.run_pending()  
  11.     time.sleep(1)  
  12.   
  13. # To run in background - https://github.com/mrhwick/schedule/blob/master/schedule/__init__.py#L63  
Apart from the deferred job, this code snippet also shows that we need to keep the thread alive for the jobs to run. That’s because this library doesn’t create actual cron or at jobs. If you don't want to block the main thread of your program like in the example above, you can also run it in the background as shown here.

All The Features You Might Ever Need
All the previously mentioned tools have their pros and cons, some specific features and design that makes them good for some specific use cases. If you, however need to run both deferred and periodic jobs, need to store jobs in database, need builtin logging features, etc., then most likely none of the above mentioned tools are going to cut it.

The most feature-rich and powerful library for scheduling jobs of any kind in Python is definitely APScheduler, which stands for Advanced Python Scheduler.

It ticks all the boxes, when it comes to features mention above and these kind of features require extensive configuration, so let’s see how APScheduler does it:
  1. jobstores = {  
  2.     'mongo': MongoDBJobStore(),  # MongoDBJobStore requires PyMongo installed - `pip install pymongo`  
  3.     'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore requires SQLAlchemy installed, Recommended to use PostgreSQL  
  4. }  
  5. executors = {  
  6.     'default': ThreadPoolExecutor(20),  
  7.     'processpool': ProcessPoolExecutor(5)  
  8. }  
  9. job_defaults = {  
  10.     'coalesce': False,  
  11.     'max_instances'3  
  12. }  
  13. scheduler = BackgroundScheduler(jobstores=jobstores,  
  14.                                 executors=executors,  
  15.                                 job_defaults=job_defaults,  
  16.                                 timezone=utc,  
  17.                                 daemon=True)  # Without daemonic mode, the main thread would exit. You can also keep it alive with infinite loop.  
This code snippet shows sample configuration, which can be used to setup SQLite and MongoDB job stores, which house the scheduled jobs. It shows configuration of executors which handle running of jobs — here we specify the size of our pools. We also specify some job defaults, such as number of job instances that can run in parallel. All the configs are passed to scheduler, which is used to manage jobs.
  1. def task():  
  2.     return ...  
  3.   
  4. # trigger -> can also be 'cron' or 'date'  
  5. # misfire_grace_time -> seconds after the designated runtime that the job is still allowed to be run  
  6. # max_instances -> max number of concurrent instances  
  7. scheduler.add_job(task, trigger='interval', seconds=5, misfire_grace_time=600, max_instances=5)  
  8. 'interval' trigger can take any args from https://apscheduler.readthedocs.io/en/latest/modules/triggers/interval.html#module-apscheduler.triggers.interval  
  9.   
  10. scheduler.add_job(task, trigger='cron', month='jan-apr', day_of_week='mon-fri', hour=15, minute=30, end_date='2021-01-30')  
  11. scheduler.add_job(task, CronTrigger.from_crontab('0 17 * * sat,sun'))  
  12. 'cron' trigger can take any args from https://apscheduler.readthedocs.io/en/latest/modules/triggers/cron.html#module-apscheduler.triggers.cron  
  13.   
  14. # Simulates 'at' - deferred jobs  
  15. scheduler.add_job(task, trigger='date', run_date=datetime(2020122417300))  
  16. 'date' trigger can take any args from https://apscheduler.readthedocs.io/en/latest/modules/triggers/date.html#module-apscheduler.triggers.date  
  17.   
  18. scheduler.print_jobs(jobstore="default")  
  19.   
  20. scheduler.start()  
  21.   
  22. # Pending jobs:  
  23. #     task (trigger: interval[0:01:00], pending)  
  24. #     task (trigger: cron[month='jan-apr', day_of_week='mon-fri', hour='15', minute='30'], pending)  
  25. #     task (trigger: cron[month='*', day='*', day_of_week='sat,sun', hour='17', minute='0'], pending)  
  26. #     task (trigger: date[2020-12-24 17:30:00 UTC], pending)  
Next comes the creation of our jobs using .add_job() method. It takes quite a few arguments, first of them being function to be ran. Next is the trigger type, which can be interval, cron or date. Interval schedules jobs to run periodically in said interval. Cron is just good old cron-like scheduler, which allows for classic and keyword argument-based scheduling arguments. Finally, date trigger create onetime jobs at specific date and time.

One more important argument to .add_job() is misfire_grace_time, which provides anacron-like feature, or in other words - in case your job doesn't run because scheduler is down, scheduler will try to run it when it's back up, as long as the misfire_grace_time hasn't been exceeded.

Scheduled jobs are generally annoying to debug. APScheduler tries to alleviate that with the ability to easily configure logging levels as well as an ability to add listeners to scheduler events — e.g. when job is executed or when job fails. You can see such listener and log sample log output below:
  1. # Catching scheduler events:  
  2. import logging  
  3.   
  4. logging.basicConfig(level=logging.INFO)  
  5.   
  6. def my_listener(event):  
  7.     if event.exception:  
  8.         logging.warning('Job failed...')  
  9.     else:  
  10.         logging.info('Job was executed...')  
  11.   
  12.   
  13. scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)  
  14.   
  15. # ...  
  16. # INFO:apscheduler.scheduler:Scheduler started  
  17. # INFO:apscheduler.executors.default:Running job "task (trigger: interval[0:00:05], next run at: 2020-10-30 09:51:11 UTC)" (scheduled at 2020-10-30 09:51:11.222311+00:00)  
  18. # INFO:apscheduler.executors.default:Job "task (trigger: interval[0:00:05], next run at: 2020-10-30 09:51:16 UTC)" executed successfully  
  19. # INFO:root:Job was executed...  
For The Gevent Users
Last and maybe actually the least (desirable solution) is to use Gevent. Not because Gevent is bad, but because it isn’t really built for running scheduled tasks. If you’re, however already using Gevent in your application, it might make sense to use it to schedule jobs too.

If you aren’t familiar with Gevent, then Gevent is a concurrency library based on coroutines. It uses Greenlets to provide pseudo-concurrency for running multiple tasks in single OS thread. For a better understanding, let’s see a basic example:
  1. # Plain Gevent without scheduler  
  2. import gevent  
  3. from gevent import monkey  
  4.   
  5. # patches stdlib (including socket and ssl modules) to cooperate with other greenlets  
  6. monkey.patch_all()  
  7.   
  8. import requests  
  9.   
  10. # Note that we're using HTTPS, so  
  11. this demonstrates that SSL works.  
  12. urls = [  
  13.     'https://www.google.com/',  
  14.     'https://www.twitter.com/',  
  15.     'https://www.python.org/'  
  16. ]  
  17.   
  18. def head_size(url):  
  19.     print(f'Starting {url}')  
  20.     data = requests.get(url).content  
  21.     print(f'{url}: {len(data)}')  
  22.   
  23. jobs = [gevent.spawn(head_size, _url) for _url in urls]  
  24.   
  25. gevent.wait(jobs)  
This example shows how we can query multiple URLs in parallel using Gevent and its gevent.spawn. In the output above, you can see that all 3 jobs that were created started at the same(-ish) time and returned data later.

To perform the same task, but scheduled in the future, we can do the following:
  1. # deferred job (one-off)  
  2. def schedule(delay, func, *args, **kw_args):  
  3.     gevent.spawn_later(0, func, *args, **kw_args)  
  4.     gevent.spawn_later(delay, schedule, delay, func, *args, **kw_args)  
  5.   
  6. schedule(30, head_size, urls[0])  
  7.   
  8. # periodic job  
  9. this will drift...  
  10. def run_regularly(function, interval, *args, **kwargs):  
  11.     while True:  
  12.         gevent.sleep(interval)  
  13.         function(*args, **kwargs)  
  14.   
  15.   
  16. run_regularly(head_size, 30, urls[0])  
Above we can see both example for running one-off jobs as well as periodic ones. Both of these solutions are kind of a hack/trick and should only be considered if you’re already using Gevent in your application. It’s also important to mention that the above run_regularly function will slowly start to drift, even if we account for the runtime of tasks. Therefore, you should preferably use GeventSchedule available in APScheduler library instead, as it's a more robust solution.

Supplement
Linux 設定 crontab 例行性工作排程教學與範例

沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...