Selaa lähdekoodia

Cron like events scheduler (#4339)

* - trigger new "cron" event
- check cron string syntax
- added croniter to requirements

* - run scheduler ever 60 sec
- trigger all enabled events
- enqueue if now >= next time execution since last one

* Update task-runner.md

* fixed tests

* fix triggering with now = True

* modified sobstitution to cron_map
modified annually label to annual

* ability to use labels defined in cron_map in cron string definition
version-14
joezsweet 7 vuotta sitten
committed by Rushabh Mehta
vanhempi
commit
299ab6198a
4 muutettua tiedostoa jossa 86 lisäystä ja 59 poistoa
  1. +19
    -2
      frappe/docs/user/en/tutorial/task-runner.md
  2. +2
    -2
      frappe/tests/test_scheduler.py
  3. +64
    -54
      frappe/utils/scheduler.py
  4. +1
    -1
      requirements.txt

+ 19
- 2
frappe/docs/user/en/tutorial/task-runner.md Näytä tiedosto

@@ -1,8 +1,8 @@
# Scheduled Tasks

Finally, an application also has to send email notifications and do other kind of scheduled tasks. In Frappé, if you have setup the bench, the task / scheduler is setup via Celery using Redis Queue.
Finally, an application also has to send email notifications and do other kind of scheduled tasks. In Frappé, if you have setup the bench, the task / scheduler is setup via RQ using Redis Queue.

To add a new task handler, go to `hooks.py` and add a new handler. Default handlers are `all`, `daily`, `weekly`, `monthly`. The `all` handler is called every 3 minutes by default.
To add a new task handler, go to `hooks.py` and add a new handler. Default handlers are `all`, `daily`, `weekly`, `monthly`, `cron`. The `all` handler is called every 4 minutes by default.

# Scheduled Tasks
# ---------------
@@ -11,6 +11,15 @@ To add a new task handler, go to `hooks.py` and add a new handler. Default handl
"daily": [
"library_management.tasks.daily"
],
"cron": {
"0/10 * * * *": [
"library_management.task.run_every_ten_mins"
],
"15 18 * * *": [
"library_management.task.every_day_at_18_15"
]
}
}

Here we can point to a Python function and that function will be executed every day. Let us look what this function looks like:
@@ -21,6 +30,14 @@ Here we can point to a Python function and that function will be executed every
from __future__ import unicode_literals
import frappe
from frappe.utils import datediff, nowdate, format_date, add_days
def every_ten_minutes():
# stuff to do every 10 minutes
pass
def every_day_at_18_15():
# stuff to do every day at 6:15pm
pass

def daily():
loan_period = frappe.db.get_value("Library Management Settings",


+ 2
- 2
frappe/tests/test_scheduler.py Näytä tiedosto

@@ -33,7 +33,7 @@ class TestScheduler(TestCase):
next_event = last_event + relativedelta(minutes=30)

enqueue_applicable_events(frappe.local.site, next_event, last_event)
self.assertFalse("all" in frappe.flags.ran_schedulers)
self.assertFalse("cron" in frappe.flags.ran_schedulers)

# maintain last_event and next_event on the same day
last_event = now_datetime().replace(hour=0, minute=0, second=0, microsecond=0)
@@ -55,7 +55,7 @@ class TestScheduler(TestCase):

enqueue_applicable_events(frappe.local.site, next_event, last_event)
self.assertTrue("all" in frappe.flags.ran_schedulers)
self.assertTrue("hourly" in frappe.flags.ran_schedulers)
self.assertFalse("hourly" in frappe.flags.ran_schedulers)


def test_restrict_scheduler_events(self):


+ 64
- 54
frappe/utils/scheduler.py Näytä tiedosto

@@ -25,15 +25,30 @@ from frappe.utils.data import get_datetime, now_datetime
from frappe.core.doctype.user.user import STANDARD_USERS
from frappe.installer import update_site_config
from six import string_types
from croniter import croniter

DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'

cron_map = {
"yearly": "0 0 1 1 *",
"annual": "0 0 1 1 *",
"monthly": "0 0 1 * *",
"monthly_long": "0 0 1 * *",
"weekly": "0 0 * * 0",
"weekly_long": "0 0 * * 0",
"daily": "0 0 * * *",
"daily_long": "0 0 * * *",
"midnight": "0 0 * * *",
"hourly": "0 * * * *",
"hourly_long": "0 * * * *",
"all": "0/" + str((frappe.get_conf().scheduler_interval or 240) // 60) + " * * * *",
}

def start_scheduler():
'''Run enqueue_events_for_all_sites every 2 minutes (default).
Specify scheduler_interval in seconds in common_site_config.json'''

interval = frappe.get_conf().scheduler_interval or 240
schedule.every(interval).seconds.do(enqueue_events_for_all_sites)
schedule.every(60).seconds.do(enqueue_events_for_all_sites)

while True:
schedule.run_pending()
@@ -105,64 +120,59 @@ def enqueue_applicable_events(site, nowtime, last, queued_jobs=()):

enabled_events = get_enabled_scheduler_events()

def trigger_if_enabled(site, event):
if event in enabled_events:
trigger(site, event, queued_jobs)
_log(event)
def trigger_if_enabled(site, event, last, queued_jobs):
trigger(site, event, last, queued_jobs)
_log(event)

def _log(event):
out.append("{time} - {event} - queued".format(time=nowtime_str, event=event))

if nowtime.day != last.day:
# if first task of the day execute daily tasks
trigger_if_enabled(site, "daily")
trigger_if_enabled(site, "daily_long")

if nowtime.month != last.month:
trigger_if_enabled(site, "monthly")
trigger_if_enabled(site, "monthly_long")

if nowtime.weekday()==0:
trigger_if_enabled(site, "weekly")
trigger_if_enabled(site, "weekly_long")

if "all" not in enabled_events:
trigger(site, "all", queued_jobs)

if "hourly" not in enabled_events:
trigger(site, "hourly", queued_jobs)

if nowtime.hour != last.hour:
trigger_if_enabled(site, "hourly")
trigger_if_enabled(site, "hourly_long")

if "all" not in enabled_events:
trigger(site, "all", queued_jobs)
for event in enabled_events:
trigger_if_enabled(site, event, last, queued_jobs)

trigger_if_enabled(site, "all")
if "all" not in enabled_events:
trigger_if_enabled(site, "all", last, queued_jobs)

return out

def trigger(site, event, queued_jobs=(), now=False):
"""trigger method in hooks.scheduler_events"""
queue = 'long' if event.endswith('_long') else 'short'
timeout = queue_timeout[queue]
if not queued_jobs and not now:
queued_jobs = get_jobs(site=site, queue=queue)

if frappe.flags.in_test:
frappe.flags.ran_schedulers.append(event)

events = get_scheduler_events(event)
if not events:
return

for handler in events:
if not now:
if handler not in queued_jobs:
enqueue(handler, queue, timeout, event)
else:
scheduler_task(site=site, event=event, handler=handler, now=True)
def trigger(site, event, last=None, queued_jobs=(), now=False):
"""Trigger method in hooks.scheduler_events."""

queue = 'long' if event.endswith('_long') else 'short'
timeout = queue_timeout[queue]
if not queued_jobs and not now:
queued_jobs = get_jobs(site=site, queue=queue)

if frappe.flags.in_test:
frappe.flags.ran_schedulers.append(event)

events_from_hooks = get_scheduler_events(event)
if not events_from_hooks:
return

events = events_from_hooks
if not now:
events = []
if event == "cron":
for e in events_from_hooks:
e = cron_map.get(e, e)
if croniter.is_valid(e):
if croniter(e, last).get_next(datetime) <= frappe.utils.now_datetime():
events.extend(events_from_hooks[e])
else:
frappe.log_error("Cron string " + e + " is not valid", "Error triggering cron job")
frappe.logger(__name__).error('Exception in Trigger Events for Site {0}, Cron String {1}'.format(site, e))

else:
if croniter(cron_map[event], last).get_next(datetime) <= frappe.utils.now_datetime():
events.extend(events_from_hooks)

for handler in events:
if not now:
if handler not in queued_jobs:
enqueue(handler, queue, timeout, event)
else:
scheduler_task(site=site, event=event, handler=handler, now=True)

def get_scheduler_events(event):
'''Get scheduler events from hooks and integrations'''
@@ -205,7 +215,7 @@ def get_enabled_scheduler_events():
return enabled_events

return ["all", "hourly", "hourly_long", "daily", "daily_long",
"weekly", "weekly_long", "monthly", "monthly_long"]
"weekly", "weekly_long", "monthly", "monthly_long", "cron"]

def is_scheduler_disabled():
if frappe.conf.disable_scheduler:
@@ -293,7 +303,7 @@ def restrict_scheduler_events_if_dormant():
update_site_config('dormant', True)

def restrict_scheduler_events(*args, **kwargs):
val = json.dumps(["hourly", "hourly_long", "daily", "daily_long", "weekly", "weekly_long", "monthly", "monthly_long"])
val = json.dumps(["hourly", "hourly_long", "daily", "daily_long", "weekly", "weekly_long", "monthly", "monthly_long", "cron"])
frappe.db.set_global('enabled_scheduler_events', val)

def is_dormant(since = 345600):


+ 1
- 1
requirements.txt Näytä tiedosto

@@ -50,4 +50,4 @@ pyqrcode
pypng
premailer
psycopg2
croniter

Ladataan…
Peruuta
Tallenna