From 378053d11914621f8d771c6c02deab612a18400a Mon Sep 17 00:00:00 2001 From: Rushabh Mehta Date: Wed, 27 Jan 2016 18:51:38 +0530 Subject: [PATCH 1/2] [admin] added bench purge-pending-tasks, and queue inspection in bench dump-queue-status --- frappe/commands.py | 15 +++++++++------ frappe/utils/doctor.py | 35 +++++++++++++++++++++++++++++------ 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/frappe/commands.py b/frappe/commands.py index 0a19a220dd..81e3ee8689 100644 --- a/frappe/commands.py +++ b/frappe/commands.py @@ -839,20 +839,23 @@ def celery_doctor(site=None): frappe.init('') return _celery_doctor(site=site) -@click.command('purge-all-tasks') -def purge_all_tasks(): - "Purge any pending periodic tasks of 'all' event. Doesn't purge hourly, daily and weekly" - frappe.init('') +@click.command('purge-pending-tasks') +@click.option('--site', help='site name') +@click.option('--event', default=None, help='one of "all", "weekly", "monthly", "hourly", "daily", "weekly_long", "daily_long"') +def purge_all_tasks(site=None, event=None): + "Purge any pending periodic tasks, if event option is not given, it will purge everything for the site" from frappe.utils.doctor import purge_pending_tasks - count = purge_pending_tasks() + frappe.init(site or '') + count = purge_pending_tasks(event=None, site=None) print "Purged {} tasks".format(count) @click.command('dump-queue-status') def dump_queue_status(): "Dump detailed diagnostic infomation for task queues in JSON format" frappe.init('') - from frappe.utils.doctor import dump_queue_status as _dump_queue_status + from frappe.utils.doctor import dump_queue_status as _dump_queue_status, inspect_queue print json.dumps(_dump_queue_status(), indent=1) + print inspect_queue @click.command('make-app') @click.argument('destination') diff --git a/frappe/utils/doctor.py b/frappe/utils/doctor.py index 4374d54240..a7e5b9ffa6 100644 --- a/frappe/utils/doctor.py +++ b/frappe/utils/doctor.py @@ -26,23 +26,32 @@ def get_queues(site=None): def get_task_body(taskstr): return json.loads(base64.decodestring(json.loads(taskstr)['body'])) -def purge_pending_tasks(event='all'): +def purge_pending_tasks(event=None, site=None): """ Purge tasks of the event event type. Passing 'all' will not purge all events but of the all event type, ie. the ones that are enqueued every five mintues and would any leave daily, hourly and weekly tasks """ r = get_redis_conn() - event_tasks = frappe.get_hooks()['scheduler_events'][event] + if event: + event_tasks = frappe.get_hooks()['scheduler_events'][event] count = 0 for queue in get_queues(): for taskstr in r.lrange(queue, 0, -1): taskbody = get_task_body(taskstr) kwargs = taskbody.get('kwargs') - if kwargs and kwargs.get('handler') and kwargs.get('handler') in event_tasks: - r.lrem(queue, taskstr) - count += 1 + if kwargs: + if site and kwargs.get('site') != site: + continue + + if event: + if kwargs.get('handler') and kwargs.get('handler') in event_tasks: + r.lrem(queue, taskstr) + count += 1 + else: + r.lrem(queue, taskstr) + count += 1 return count def get_pending_task_count(): @@ -131,7 +140,6 @@ def doctor(): """ Prints diagnostic information for the scheduler """ - flag = False workers_online = check_if_workers_online() pending_tasks = get_pending_task_count() locks = get_timedout_locks() @@ -141,6 +149,9 @@ def doctor(): print "\n".join(locks) if (not workers_online) or (pending_tasks > 4000) or locks: return 1 + + print "Note: To view pending tasks, use bench dump-queue-status" + return True def celery_doctor(site=None): @@ -149,6 +160,18 @@ def celery_doctor(site=None): print 'Queue Status' print '------------' print json.dumps(queues, indent=1) + print '' print 'Running Tasks' print '------------' print json.dumps(running_tasks, indent=1) + +def inspect_queue(): + print 'Pending Tasks Queue' + print '-'*20 + r = get_redis_conn() + for queue in get_queues(): + for taskstr in r.lrange(queue, 0, -1): + taskbody = get_task_body(taskstr) + kwargs = taskbody.get('kwargs') + if kwargs: + print frappe.as_json(kwargs) From 4f85bd3284fa4b09f5dbdd5d9ebf7704b94fd1f2 Mon Sep 17 00:00:00 2001 From: Rushabh Mehta Date: Thu, 28 Jan 2016 11:04:20 +0530 Subject: [PATCH 2/2] [bench] bench doctor to show if scheduler is disabled via system settings --- frappe/__init__.py | 2 ++ frappe/commands.py | 1 - frappe/exceptions.py | 2 ++ frappe/utils/doctor.py | 13 +++++++++++++ 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/frappe/__init__.py b/frappe/__init__.py index 8d558220ff..141da8f48b 100644 --- a/frappe/__init__.py +++ b/frappe/__init__.py @@ -173,6 +173,8 @@ def get_site_config(sites_path=None, site_path=None): site_config = os.path.join(site_path, "site_config.json") if os.path.exists(site_config): config.update(get_file_json(site_config)) + elif local.site: + raise IncorrectSitePath, "{0} does not exist".format(site_config) return _dict(config) diff --git a/frappe/commands.py b/frappe/commands.py index 81e3ee8689..1e0551dfe3 100644 --- a/frappe/commands.py +++ b/frappe/commands.py @@ -828,7 +828,6 @@ def request(context, args): def doctor(): "Get diagnostic info about background workers" from frappe.utils.doctor import doctor as _doctor - frappe.init('') return _doctor() @click.command('celery-doctor') diff --git a/frappe/exceptions.py b/frappe/exceptions.py index bdb6eb07a5..0eb4af42e3 100644 --- a/frappe/exceptions.py +++ b/frappe/exceptions.py @@ -59,3 +59,5 @@ class InvalidEmailAddressError(ValidationError): pass class TemplateNotFoundError(ValidationError): pass class UniqueValidationError(ValidationError): pass class AppNotInstalledError(ValidationError): pass +class IncorrectSitePath(ValidationError): pass + diff --git a/frappe/utils/doctor.py b/frappe/utils/doctor.py index a7e5b9ffa6..fd40f414f4 100644 --- a/frappe/utils/doctor.py +++ b/frappe/utils/doctor.py @@ -3,6 +3,7 @@ import json, base64, os import frappe.utils from frappe.celery_app import get_celery from frappe.utils.file_lock import check_lock, LockTimeoutError +from frappe.utils.scheduler import is_scheduler_disabled from collections import Counter from operator import itemgetter @@ -140,9 +141,21 @@ def doctor(): """ Prints diagnostic information for the scheduler """ + print "Inspecting workers and queues..." workers_online = check_if_workers_online() pending_tasks = get_pending_task_count() + + print "Finding locks..." locks = get_timedout_locks() + + print "Checking scheduler status..." + for site in frappe.utils.get_sites(): + frappe.init(site) + frappe.connect() + if not is_scheduler_disabled(): + print "{0:40}: Scheduler disabled via System Settings or site_config.json".format(site) + frappe.destroy() + print "Workers online:", workers_online print "Pending tasks", pending_tasks print "Timed out locks:"