diff --git a/frappe/core/doctype/scheduler_log/scheduler_log.json b/frappe/core/doctype/scheduler_log/scheduler_log.json index fb94e75991..4412a78a8a 100644 --- a/frappe/core/doctype/scheduler_log/scheduler_log.json +++ b/frappe/core/doctype/scheduler_log/scheduler_log.json @@ -3,6 +3,7 @@ "allow_import": 0, "allow_rename": 0, "autoname": "SCHLOG.#####", + "beta": 0, "creation": "2013-01-16 13:09:40", "custom": 0, "description": "Log of Scheduler Errors", @@ -53,7 +54,7 @@ "permlevel": 0, "print_hide": 0, "print_hide_if_no_value": 0, - "read_only": 0, + "read_only": 1, "report_hide": 0, "reqd": 0, "search_index": 0, @@ -77,7 +78,7 @@ "permlevel": 0, "print_hide": 0, "print_hide_if_no_value": 0, - "read_only": 0, + "read_only": 1, "report_hide": 0, "reqd": 0, "search_index": 0, @@ -89,13 +90,14 @@ "hide_toolbar": 0, "icon": "icon-warning-sign", "idx": 1, + "image_view": 0, "in_create": 0, "in_dialog": 0, "is_submittable": 0, "issingle": 0, "istable": 0, "max_attachments": 0, - "modified": "2016-02-22 09:35:31.852571", + "modified": "2016-07-03 14:24:13.581374", "modified_by": "Administrator", "module": "Core", "name": "Scheduler Log", @@ -122,6 +124,9 @@ "write": 1 } ], + "quick_entry": 1, "read_only": 0, - "read_only_onload": 0 + "read_only_onload": 0, + "sort_order": "ASC", + "track_seen": 0 } \ No newline at end of file diff --git a/frappe/exceptions.py b/frappe/exceptions.py index 56d066e43e..2bd5e94851 100644 --- a/frappe/exceptions.py +++ b/frappe/exceptions.py @@ -61,3 +61,4 @@ class UniqueValidationError(ValidationError): pass class AppNotInstalledError(ValidationError): pass class IncorrectSitePath(NotFound): pass class ImplicitCommitError(ValidationError): pass +class RetryBackgroundJobError(Exception): pass diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index b0b0b39604..e292e03e51 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -4,7 +4,8 @@ from rq import Connection, Queue, Worker from frappe.utils import cstr from collections import defaultdict import frappe -import os, socket +import MySQLdb +import os, socket, time default_timeout = 300 queue_timeout = { @@ -26,7 +27,7 @@ def enqueue(method, queue='default', timeout=300, event=None, :param job_name: can be used to name an enqueue call, which can be used to prevent duplicate calls :param kwargs: keyword arguments to be passed to the method ''' - q = get_queue(queue) + q = get_queue(queue, async=async) if not timeout: timeout = queue_timeout.get(queue) or 300 @@ -36,30 +37,56 @@ def enqueue(method, queue='default', timeout=300, event=None, "method": method, "event": event, "job_name": job_name or cstr(method), - "kwargs":kwargs + "async": async, + "kwargs": kwargs }) -def execute_job(site, method, event, job_name, kwargs): +def execute_job(site, method, event, job_name, kwargs, async=True, retry=0): '''Executes job in a worker, performs commit/rollback and logs if there is any error''' from frappe.utils.scheduler import log - frappe.connect(site) + + if async: + frappe.connect(site) if isinstance(method, basestring): method_name = method method = frappe.get_attr(method) else: - method_name = cstr(method) + method_name = cstr(method.__name__) try: method(**kwargs) + + except (MySQLdb.OperationalError, frappe.RetryBackgroundJobError), e: + frappe.db.rollback() + + if (retry < 5 and + (isinstance(e, frappe.RetryBackgroundJobError) or e.args[0] in (1213, 1205))): + # retry the job if + # 1213 = deadlock + # 1205 = lock wait timeout + # or RetryBackgroundJobError is explicitly raised + frappe.destroy() + time.sleep(retry+1) + + return execute_job(site, method, event, job_name, kwargs, + async=async, retry=retry+1) + + else: + log(method_name, message=repr(locals())) + raise + except: frappe.db.rollback() - log(method_name) + log(method_name, message=repr(locals())) raise + else: frappe.db.commit() + finally: - frappe.destroy() + if async: + frappe.destroy() def start_worker(queue=None): '''Wrapper to start rq worker. Connects to redis and monitors these queues.''' @@ -120,11 +147,11 @@ def get_queue_list(queue_list=None): else: return default_queue_list -def get_queue(queue): +def get_queue(queue, async=True): '''Returns a Queue object tied to a redis connection''' validate_queue(queue) - return Queue(queue, connection=get_redis_conn()) + return Queue(queue, connection=get_redis_conn(), async=async) def validate_queue(queue, default_queue_list=None): if not default_queue_list: diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 1f680f3d05..c3a87aab90 100755 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -66,8 +66,6 @@ def enqueue_events_for_site(site, queued_jobs): enqueue_events(site=site, queued_jobs=queued_jobs) - # TODO this print call is a tempfix till logging is fixed! - print 'Queued events for site {0}'.format(site) frappe.logger(__name__).debug('Queued events for site {0}'.format(site)) except: