diff --git a/frappe/__init__.py b/frappe/__init__.py index 8d558220ff..141da8f48b 100644 --- a/frappe/__init__.py +++ b/frappe/__init__.py @@ -173,6 +173,8 @@ def get_site_config(sites_path=None, site_path=None): site_config = os.path.join(site_path, "site_config.json") if os.path.exists(site_config): config.update(get_file_json(site_config)) + elif local.site: + raise IncorrectSitePath, "{0} does not exist".format(site_config) return _dict(config) diff --git a/frappe/commands.py b/frappe/commands.py index 0a19a220dd..1e0551dfe3 100644 --- a/frappe/commands.py +++ b/frappe/commands.py @@ -828,7 +828,6 @@ def request(context, args): def doctor(): "Get diagnostic info about background workers" from frappe.utils.doctor import doctor as _doctor - frappe.init('') return _doctor() @click.command('celery-doctor') @@ -839,20 +838,23 @@ def celery_doctor(site=None): frappe.init('') return _celery_doctor(site=site) -@click.command('purge-all-tasks') -def purge_all_tasks(): - "Purge any pending periodic tasks of 'all' event. Doesn't purge hourly, daily and weekly" - frappe.init('') +@click.command('purge-pending-tasks') +@click.option('--site', help='site name') +@click.option('--event', default=None, help='one of "all", "weekly", "monthly", "hourly", "daily", "weekly_long", "daily_long"') +def purge_all_tasks(site=None, event=None): + "Purge any pending periodic tasks, if event option is not given, it will purge everything for the site" from frappe.utils.doctor import purge_pending_tasks - count = purge_pending_tasks() + frappe.init(site or '') + count = purge_pending_tasks(event=None, site=None) print "Purged {} tasks".format(count) @click.command('dump-queue-status') def dump_queue_status(): "Dump detailed diagnostic infomation for task queues in JSON format" frappe.init('') - from frappe.utils.doctor import dump_queue_status as _dump_queue_status + from frappe.utils.doctor import dump_queue_status as _dump_queue_status, inspect_queue print json.dumps(_dump_queue_status(), indent=1) + print inspect_queue @click.command('make-app') @click.argument('destination') diff --git a/frappe/exceptions.py b/frappe/exceptions.py index bdb6eb07a5..0eb4af42e3 100644 --- a/frappe/exceptions.py +++ b/frappe/exceptions.py @@ -59,3 +59,5 @@ class InvalidEmailAddressError(ValidationError): pass class TemplateNotFoundError(ValidationError): pass class UniqueValidationError(ValidationError): pass class AppNotInstalledError(ValidationError): pass +class IncorrectSitePath(ValidationError): pass + diff --git a/frappe/utils/doctor.py b/frappe/utils/doctor.py index 4374d54240..fd40f414f4 100644 --- a/frappe/utils/doctor.py +++ b/frappe/utils/doctor.py @@ -3,6 +3,7 @@ import json, base64, os import frappe.utils from frappe.celery_app import get_celery from frappe.utils.file_lock import check_lock, LockTimeoutError +from frappe.utils.scheduler import is_scheduler_disabled from collections import Counter from operator import itemgetter @@ -26,23 +27,32 @@ def get_queues(site=None): def get_task_body(taskstr): return json.loads(base64.decodestring(json.loads(taskstr)['body'])) -def purge_pending_tasks(event='all'): +def purge_pending_tasks(event=None, site=None): """ Purge tasks of the event event type. Passing 'all' will not purge all events but of the all event type, ie. the ones that are enqueued every five mintues and would any leave daily, hourly and weekly tasks """ r = get_redis_conn() - event_tasks = frappe.get_hooks()['scheduler_events'][event] + if event: + event_tasks = frappe.get_hooks()['scheduler_events'][event] count = 0 for queue in get_queues(): for taskstr in r.lrange(queue, 0, -1): taskbody = get_task_body(taskstr) kwargs = taskbody.get('kwargs') - if kwargs and kwargs.get('handler') and kwargs.get('handler') in event_tasks: - r.lrem(queue, taskstr) - count += 1 + if kwargs: + if site and kwargs.get('site') != site: + continue + + if event: + if kwargs.get('handler') and kwargs.get('handler') in event_tasks: + r.lrem(queue, taskstr) + count += 1 + else: + r.lrem(queue, taskstr) + count += 1 return count def get_pending_task_count(): @@ -131,16 +141,30 @@ def doctor(): """ Prints diagnostic information for the scheduler """ - flag = False + print "Inspecting workers and queues..." workers_online = check_if_workers_online() pending_tasks = get_pending_task_count() + + print "Finding locks..." locks = get_timedout_locks() + + print "Checking scheduler status..." + for site in frappe.utils.get_sites(): + frappe.init(site) + frappe.connect() + if not is_scheduler_disabled(): + print "{0:40}: Scheduler disabled via System Settings or site_config.json".format(site) + frappe.destroy() + print "Workers online:", workers_online print "Pending tasks", pending_tasks print "Timed out locks:" print "\n".join(locks) if (not workers_online) or (pending_tasks > 4000) or locks: return 1 + + print "Note: To view pending tasks, use bench dump-queue-status" + return True def celery_doctor(site=None): @@ -149,6 +173,18 @@ def celery_doctor(site=None): print 'Queue Status' print '------------' print json.dumps(queues, indent=1) + print '' print 'Running Tasks' print '------------' print json.dumps(running_tasks, indent=1) + +def inspect_queue(): + print 'Pending Tasks Queue' + print '-'*20 + r = get_redis_conn() + for queue in get_queues(): + for taskstr in r.lrange(queue, 0, -1): + taskbody = get_task_body(taskstr) + kwargs = taskbody.get('kwargs') + if kwargs: + print frappe.as_json(kwargs)