diff --git a/frappe/cli.py b/frappe/cli.py index b4cc5813f2..dabb160b16 100755 --- a/frappe/cli.py +++ b/frappe/cli.py @@ -7,9 +7,10 @@ from __future__ import unicode_literals import os import subprocess import frappe +import json from frappe.utils import cint -site_arg_optional = ['serve', 'build', 'watch', 'celery', 'resize_images'] +site_arg_optional = ['serve', 'build', 'watch', 'celery', 'resize_images', 'doctor', 'dump_queue_status', 'purge_all_tasks'] def get_site(parsed_args): if not parsed_args.get("site") and os.path.exists(os.path.join(parsed_args["sites_path"], "currentsite.txt")): @@ -273,6 +274,14 @@ def setup_utilities(parser): parser.add_argument("--import_doc", nargs=1, metavar="PATH", help="""Import (insert/update) doclist. If the argument is a directory, all files ending with .json are imported""") + # doctor + parser.add_argument("--doctor", default=False, action="store_true", + help="Print diagnostic information for task queues") + parser.add_argument("--purge_all_tasks", default=False, action="store_true", + help="Purge any pending periodic tasks of 'all' event. Doesn't purge hourly, daily and weekly") + parser.add_argument("--dump_queue_status", default=False, action="store_true", + help="Dump detailed diagnostic infomation for task queues in JSON format") + def setup_translation(parser): parser.add_argument("--build_message_files", default=False, action="store_true", help="Build message files for translation.") @@ -949,68 +958,20 @@ def update_site_config(site_config, verbose=False): frappe.destroy() @cmd -def bump(repo, bump_type): +def doctor(): + from frappe.utils.doctor import doctor as _doctor + return _doctor() - import json - assert repo in ['lib', 'app'] - assert bump_type in ['minor', 'major', 'patch'] - - def validate(repo_path): - import git - repo = git.Repo(repo_path) - if repo.active_branch != 'master': - raise Exception, "Current branch not master in {}".format(repo_path) - - def bump_version(version, version_type): - import semantic_version - v = semantic_version.Version(version) - if version_type == 'minor': - v.minor += 1 - elif version_type == 'major': - v.major += 1 - elif version_type == 'patch': - v.patch += 1 - return unicode(v) - - def add_tag(repo_path, version): - import git - repo = git.Repo(repo_path) - repo.index.add(['config.json']) - repo.index.commit('bumped to version {}'.format(version)) - repo.create_tag('v' + version, repo.head) - - def update_framework_requirement(version): - with open('app/config.json') as f: - config = json.load(f) - config['requires_framework_version'] = '==' + version - with open('app/config.json', 'w') as f: - json.dump(config, f, indent=1, sort_keys=True) - - validate('lib/') - validate('app/') - - if repo == 'app': - with open('app/config.json') as f: - config = json.load(f) - new_version = bump_version(config['app_version'], bump_type) - config['app_version'] = new_version - with open('app/config.json', 'w') as f: - json.dump(config, f, indent=1, sort_keys=True) - add_tag('app/', new_version) - - elif repo == 'lib': - with open('lib/config.json') as f: - config = json.load(f) - new_version = bump_version(config['framework_version'], bump_type) - config['framework_version'] = new_version - with open('lib/config.json', 'w') as f: - json.dump(config, f, indent=1, sort_keys=True) - add_tag('lib/', new_version) - - update_framework_requirement(new_version) - - bump('app', bump_type) +@cmd +def purge_all_tasks(): + from frappe.utils.doctor import purge_pending_tasks + count = purge_pending_tasks() + print "Purged {} tasks".format(count) +@cmd +def dump_queue_status(): + from frappe.utils.doctor import dump_queue_status as _dump_queue_status + print json.dumps(_dump_queue_status(), indent=1) if __name__=="__main__": out = main() diff --git a/frappe/utils/doctor.py b/frappe/utils/doctor.py new file mode 100644 index 0000000000..1f2e00de3c --- /dev/null +++ b/frappe/utils/doctor.py @@ -0,0 +1,122 @@ +import json, base64, os +import frappe.cli +from frappe.celery_app import get_celery +from frappe.utils.file_lock import check_lock, LockTimeoutError +from collections import Counter +from operator import itemgetter + +def get_redis_conn(): + "Returns the redis connection that celery would use" + app = get_celery() + with app.connection() as conn: + r = conn.default_channel.client + return r + +def get_queues(): + "Returns the name of queues where frappe enqueues tasks as per the configuration" + queues = ["celery"] + if frappe.conf.celery_queue_per_site: + for site in frappe.cli.get_sites(): + queues.append(site) + queues.append('longjobs@' + site) + return queues + +def get_task_body(taskstr): + return json.loads(base64.decodestring(json.loads(taskstr)['body'])) + +def purge_pending_tasks(event='all'): + """ + Purge tasks of the event event type. Passing 'all' will not purge all + events but of the all event type, ie. the ones that are enqueued every five + mintues and would any leave daily, hourly and weekly tasks + """ + r = get_redis_conn() + event_tasks = frappe.get_hooks()['scheduler_events'][event] + count = 0 + + for queue in get_queues(): + for taskstr in r.lrange(queue, 0 , -1): + taskbody = get_task_body(taskstr) + kwargs = taskbody.get('kwargs') + if kwargs and kwargs.get('handler') and kwargs.get('handler') in event_tasks: + r.lrem(queue, taskstr) + count += 1 + return count + +def get_pending_task_count(): + "Get count of pending tasks" + r = get_redis_conn() + pending = 0 + for queue in get_queues(): + pending += r.llen(queue) + return pending + +def get_timedout_locks(): + "Get list of stale locks from all sites" + old_locks=[] + for site in frappe.cli.get_sites(): + locksdir = os.path.join(site, 'locks') + for lock in os.listdir(locksdir): + lock_path = os.path.join(locksdir, lock) + try: + check_lock(lock_path) + except LockTimeoutError: + old_locks.append(lock_path) + return old_locks + +def check_if_workers_online(): + app = get_celery() + if app.control.inspect().ping(): + return True + return False + +def dump_queue_status(): + """ + Dumps pending events and tasks per queue + """ + ret = [] + r = get_redis_conn() + for queue in get_queues(): + queue_details = { + 'queue': queue, + 'len': r.llen(queue), + } + queue_details.update(get_task_count_for_queue(queue)) + ret.append(queue_details) + + ret = sorted(ret, key=itemgetter('len'), reverse=True) + ret.insert(0, { + 'total': get_pending_task_count() + }) + return ret + +def get_task_count_for_queue(queue): + """ + For a given queue, returns the count of every pending task and aggregate of + events pending + """ + r = get_redis_conn() + tasks = [get_task_body(taskstr) for taskstr in r.lrange(queue, 0, -1)] + task_names = [task['task'] for task in tasks] + task_counts = Counter(task_names) + event_counts = Counter(task['kwargs'].get('event') for task in tasks) + return { + 'task_counts': task_counts, + 'event_counts': event_counts + } + +def doctor(): + """ + Prints diagnostic information for the scheduler + """ + flag = False + workers_online = check_if_workers_online() + pending_tasks = get_pending_task_count() + locks = get_timedout_locks() + print "Workers online:", workers_online + print "Pending tasks", pending_tasks + print "Timed out locks:" + print "\n".join(locks) + if (not workers_online) or (pending_tasks > 4000) or locks: + return 1 + return True