From 299ab6198a76eec3267f6e2b0eb1e7fd06ea46d0 Mon Sep 17 00:00:00 2001 From: joezsweet Date: Fri, 27 Oct 2017 11:18:48 +0200 Subject: [PATCH] Cron like events scheduler (#4339) * - trigger new "cron" event - check cron string syntax - added croniter to requirements * - run scheduler ever 60 sec - trigger all enabled events - enqueue if now >= next time execution since last one * Update task-runner.md * fixed tests * fix triggering with now = True * modified sobstitution to cron_map modified annually label to annual * ability to use labels defined in cron_map in cron string definition --- frappe/docs/user/en/tutorial/task-runner.md | 21 +++- frappe/tests/test_scheduler.py | 4 +- frappe/utils/scheduler.py | 118 +++++++++++--------- requirements.txt | 2 +- 4 files changed, 86 insertions(+), 59 deletions(-) diff --git a/frappe/docs/user/en/tutorial/task-runner.md b/frappe/docs/user/en/tutorial/task-runner.md index 0afdfeaa1b..a700f1cafd 100755 --- a/frappe/docs/user/en/tutorial/task-runner.md +++ b/frappe/docs/user/en/tutorial/task-runner.md @@ -1,8 +1,8 @@ # Scheduled Tasks -Finally, an application also has to send email notifications and do other kind of scheduled tasks. In Frappé, if you have setup the bench, the task / scheduler is setup via Celery using Redis Queue. +Finally, an application also has to send email notifications and do other kind of scheduled tasks. In Frappé, if you have setup the bench, the task / scheduler is setup via RQ using Redis Queue. -To add a new task handler, go to `hooks.py` and add a new handler. Default handlers are `all`, `daily`, `weekly`, `monthly`. The `all` handler is called every 3 minutes by default. +To add a new task handler, go to `hooks.py` and add a new handler. Default handlers are `all`, `daily`, `weekly`, `monthly`, `cron`. The `all` handler is called every 4 minutes by default. # Scheduled Tasks # --------------- @@ -11,6 +11,15 @@ To add a new task handler, go to `hooks.py` and add a new handler. Default handl "daily": [ "library_management.tasks.daily" ], + "cron": { + "0/10 * * * *": [ + "library_management.task.run_every_ten_mins" + ], + "15 18 * * *": [ + "library_management.task.every_day_at_18_15" + ] + } + } Here we can point to a Python function and that function will be executed every day. Let us look what this function looks like: @@ -21,6 +30,14 @@ Here we can point to a Python function and that function will be executed every from __future__ import unicode_literals import frappe from frappe.utils import datediff, nowdate, format_date, add_days + + def every_ten_minutes(): + # stuff to do every 10 minutes + pass + + def every_day_at_18_15(): + # stuff to do every day at 6:15pm + pass def daily(): loan_period = frappe.db.get_value("Library Management Settings", diff --git a/frappe/tests/test_scheduler.py b/frappe/tests/test_scheduler.py index ce508c3e71..4ca5feaa29 100644 --- a/frappe/tests/test_scheduler.py +++ b/frappe/tests/test_scheduler.py @@ -33,7 +33,7 @@ class TestScheduler(TestCase): next_event = last_event + relativedelta(minutes=30) enqueue_applicable_events(frappe.local.site, next_event, last_event) - self.assertFalse("all" in frappe.flags.ran_schedulers) + self.assertFalse("cron" in frappe.flags.ran_schedulers) # maintain last_event and next_event on the same day last_event = now_datetime().replace(hour=0, minute=0, second=0, microsecond=0) @@ -55,7 +55,7 @@ class TestScheduler(TestCase): enqueue_applicable_events(frappe.local.site, next_event, last_event) self.assertTrue("all" in frappe.flags.ran_schedulers) - self.assertTrue("hourly" in frappe.flags.ran_schedulers) + self.assertFalse("hourly" in frappe.flags.ran_schedulers) def test_restrict_scheduler_events(self): diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 68ab6f7cc2..aa024b2419 100755 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -25,15 +25,30 @@ from frappe.utils.data import get_datetime, now_datetime from frappe.core.doctype.user.user import STANDARD_USERS from frappe.installer import update_site_config from six import string_types +from croniter import croniter DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' +cron_map = { + "yearly": "0 0 1 1 *", + "annual": "0 0 1 1 *", + "monthly": "0 0 1 * *", + "monthly_long": "0 0 1 * *", + "weekly": "0 0 * * 0", + "weekly_long": "0 0 * * 0", + "daily": "0 0 * * *", + "daily_long": "0 0 * * *", + "midnight": "0 0 * * *", + "hourly": "0 * * * *", + "hourly_long": "0 * * * *", + "all": "0/" + str((frappe.get_conf().scheduler_interval or 240) // 60) + " * * * *", +} + def start_scheduler(): '''Run enqueue_events_for_all_sites every 2 minutes (default). Specify scheduler_interval in seconds in common_site_config.json''' - interval = frappe.get_conf().scheduler_interval or 240 - schedule.every(interval).seconds.do(enqueue_events_for_all_sites) + schedule.every(60).seconds.do(enqueue_events_for_all_sites) while True: schedule.run_pending() @@ -105,64 +120,59 @@ def enqueue_applicable_events(site, nowtime, last, queued_jobs=()): enabled_events = get_enabled_scheduler_events() - def trigger_if_enabled(site, event): - if event in enabled_events: - trigger(site, event, queued_jobs) - _log(event) + def trigger_if_enabled(site, event, last, queued_jobs): + trigger(site, event, last, queued_jobs) + _log(event) def _log(event): out.append("{time} - {event} - queued".format(time=nowtime_str, event=event)) - if nowtime.day != last.day: - # if first task of the day execute daily tasks - trigger_if_enabled(site, "daily") - trigger_if_enabled(site, "daily_long") - - if nowtime.month != last.month: - trigger_if_enabled(site, "monthly") - trigger_if_enabled(site, "monthly_long") - - if nowtime.weekday()==0: - trigger_if_enabled(site, "weekly") - trigger_if_enabled(site, "weekly_long") - - if "all" not in enabled_events: - trigger(site, "all", queued_jobs) - - if "hourly" not in enabled_events: - trigger(site, "hourly", queued_jobs) - - if nowtime.hour != last.hour: - trigger_if_enabled(site, "hourly") - trigger_if_enabled(site, "hourly_long") - - if "all" not in enabled_events: - trigger(site, "all", queued_jobs) + for event in enabled_events: + trigger_if_enabled(site, event, last, queued_jobs) - trigger_if_enabled(site, "all") + if "all" not in enabled_events: + trigger_if_enabled(site, "all", last, queued_jobs) return out -def trigger(site, event, queued_jobs=(), now=False): - """trigger method in hooks.scheduler_events""" - queue = 'long' if event.endswith('_long') else 'short' - timeout = queue_timeout[queue] - if not queued_jobs and not now: - queued_jobs = get_jobs(site=site, queue=queue) - - if frappe.flags.in_test: - frappe.flags.ran_schedulers.append(event) - - events = get_scheduler_events(event) - if not events: - return - - for handler in events: - if not now: - if handler not in queued_jobs: - enqueue(handler, queue, timeout, event) - else: - scheduler_task(site=site, event=event, handler=handler, now=True) +def trigger(site, event, last=None, queued_jobs=(), now=False): + """Trigger method in hooks.scheduler_events.""" + + queue = 'long' if event.endswith('_long') else 'short' + timeout = queue_timeout[queue] + if not queued_jobs and not now: + queued_jobs = get_jobs(site=site, queue=queue) + + if frappe.flags.in_test: + frappe.flags.ran_schedulers.append(event) + + events_from_hooks = get_scheduler_events(event) + if not events_from_hooks: + return + + events = events_from_hooks + if not now: + events = [] + if event == "cron": + for e in events_from_hooks: + e = cron_map.get(e, e) + if croniter.is_valid(e): + if croniter(e, last).get_next(datetime) <= frappe.utils.now_datetime(): + events.extend(events_from_hooks[e]) + else: + frappe.log_error("Cron string " + e + " is not valid", "Error triggering cron job") + frappe.logger(__name__).error('Exception in Trigger Events for Site {0}, Cron String {1}'.format(site, e)) + + else: + if croniter(cron_map[event], last).get_next(datetime) <= frappe.utils.now_datetime(): + events.extend(events_from_hooks) + + for handler in events: + if not now: + if handler not in queued_jobs: + enqueue(handler, queue, timeout, event) + else: + scheduler_task(site=site, event=event, handler=handler, now=True) def get_scheduler_events(event): '''Get scheduler events from hooks and integrations''' @@ -205,7 +215,7 @@ def get_enabled_scheduler_events(): return enabled_events return ["all", "hourly", "hourly_long", "daily", "daily_long", - "weekly", "weekly_long", "monthly", "monthly_long"] + "weekly", "weekly_long", "monthly", "monthly_long", "cron"] def is_scheduler_disabled(): if frappe.conf.disable_scheduler: @@ -293,7 +303,7 @@ def restrict_scheduler_events_if_dormant(): update_site_config('dormant', True) def restrict_scheduler_events(*args, **kwargs): - val = json.dumps(["hourly", "hourly_long", "daily", "daily_long", "weekly", "weekly_long", "monthly", "monthly_long"]) + val = json.dumps(["hourly", "hourly_long", "daily", "daily_long", "weekly", "weekly_long", "monthly", "monthly_long", "cron"]) frappe.db.set_global('enabled_scheduler_events', val) def is_dormant(since = 345600): diff --git a/requirements.txt b/requirements.txt index cf72c3109d..3312d232b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -50,4 +50,4 @@ pyqrcode pypng premailer psycopg2 - +croniter