Explorar el Código

Merge pull request #952 from pdvyas/doctor

add Doctor
version-14
Nabin Hait hace 10 años
padre
commit
1f84087caa
Se han modificado 2 ficheros con 144 adiciones y 61 borrados
  1. +22
    -61
      frappe/cli.py
  2. +122
    -0
      frappe/utils/doctor.py

+ 22
- 61
frappe/cli.py Ver fichero

@@ -7,9 +7,10 @@ from __future__ import unicode_literals
import os import os
import subprocess import subprocess
import frappe import frappe
import json
from frappe.utils import cint from frappe.utils import cint


site_arg_optional = ['serve', 'build', 'watch', 'celery', 'resize_images']
site_arg_optional = ['serve', 'build', 'watch', 'celery', 'resize_images', 'doctor', 'dump_queue_status', 'purge_all_tasks']


def get_site(parsed_args): def get_site(parsed_args):
if not parsed_args.get("site") and os.path.exists(os.path.join(parsed_args["sites_path"], "currentsite.txt")): if not parsed_args.get("site") and os.path.exists(os.path.join(parsed_args["sites_path"], "currentsite.txt")):
@@ -273,6 +274,14 @@ def setup_utilities(parser):
parser.add_argument("--import_doc", nargs=1, metavar="PATH", parser.add_argument("--import_doc", nargs=1, metavar="PATH",
help="""Import (insert/update) doclist. If the argument is a directory, all files ending with .json are imported""") help="""Import (insert/update) doclist. If the argument is a directory, all files ending with .json are imported""")


# doctor
parser.add_argument("--doctor", default=False, action="store_true",
help="Print diagnostic information for task queues")
parser.add_argument("--purge_all_tasks", default=False, action="store_true",
help="Purge any pending periodic tasks of 'all' event. Doesn't purge hourly, daily and weekly")
parser.add_argument("--dump_queue_status", default=False, action="store_true",
help="Dump detailed diagnostic infomation for task queues in JSON format")

def setup_translation(parser): def setup_translation(parser):
parser.add_argument("--build_message_files", default=False, action="store_true", parser.add_argument("--build_message_files", default=False, action="store_true",
help="Build message files for translation.") help="Build message files for translation.")
@@ -949,68 +958,20 @@ def update_site_config(site_config, verbose=False):
frappe.destroy() frappe.destroy()


@cmd @cmd
def bump(repo, bump_type):
def doctor():
from frappe.utils.doctor import doctor as _doctor
return _doctor()


import json
assert repo in ['lib', 'app']
assert bump_type in ['minor', 'major', 'patch']

def validate(repo_path):
import git
repo = git.Repo(repo_path)
if repo.active_branch != 'master':
raise Exception, "Current branch not master in {}".format(repo_path)

def bump_version(version, version_type):
import semantic_version
v = semantic_version.Version(version)
if version_type == 'minor':
v.minor += 1
elif version_type == 'major':
v.major += 1
elif version_type == 'patch':
v.patch += 1
return unicode(v)

def add_tag(repo_path, version):
import git
repo = git.Repo(repo_path)
repo.index.add(['config.json'])
repo.index.commit('bumped to version {}'.format(version))
repo.create_tag('v' + version, repo.head)

def update_framework_requirement(version):
with open('app/config.json') as f:
config = json.load(f)
config['requires_framework_version'] = '==' + version
with open('app/config.json', 'w') as f:
json.dump(config, f, indent=1, sort_keys=True)

validate('lib/')
validate('app/')

if repo == 'app':
with open('app/config.json') as f:
config = json.load(f)
new_version = bump_version(config['app_version'], bump_type)
config['app_version'] = new_version
with open('app/config.json', 'w') as f:
json.dump(config, f, indent=1, sort_keys=True)
add_tag('app/', new_version)

elif repo == 'lib':
with open('lib/config.json') as f:
config = json.load(f)
new_version = bump_version(config['framework_version'], bump_type)
config['framework_version'] = new_version
with open('lib/config.json', 'w') as f:
json.dump(config, f, indent=1, sort_keys=True)
add_tag('lib/', new_version)

update_framework_requirement(new_version)

bump('app', bump_type)
@cmd
def purge_all_tasks():
from frappe.utils.doctor import purge_pending_tasks
count = purge_pending_tasks()
print "Purged {} tasks".format(count)


@cmd
def dump_queue_status():
from frappe.utils.doctor import dump_queue_status as _dump_queue_status
print json.dumps(_dump_queue_status(), indent=1)


if __name__=="__main__": if __name__=="__main__":
out = main() out = main()


+ 122
- 0
frappe/utils/doctor.py Ver fichero

@@ -0,0 +1,122 @@
import json, base64, os
import frappe.cli
from frappe.celery_app import get_celery
from frappe.utils.file_lock import check_lock, LockTimeoutError
from collections import Counter
from operator import itemgetter

def get_redis_conn():
"Returns the redis connection that celery would use"
app = get_celery()
with app.connection() as conn:
r = conn.default_channel.client
return r

def get_queues():
"Returns the name of queues where frappe enqueues tasks as per the configuration"
queues = ["celery"]
if frappe.conf.celery_queue_per_site:
for site in frappe.cli.get_sites():
queues.append(site)
queues.append('longjobs@' + site)
return queues

def get_task_body(taskstr):
return json.loads(base64.decodestring(json.loads(taskstr)['body']))

def purge_pending_tasks(event='all'):
"""
Purge tasks of the event event type. Passing 'all' will not purge all
events but of the all event type, ie. the ones that are enqueued every five
mintues and would any leave daily, hourly and weekly tasks
"""
r = get_redis_conn()
event_tasks = frappe.get_hooks()['scheduler_events'][event]
count = 0

for queue in get_queues():
for taskstr in r.lrange(queue, 0 , -1):
taskbody = get_task_body(taskstr)
kwargs = taskbody.get('kwargs')
if kwargs and kwargs.get('handler') and kwargs.get('handler') in event_tasks:
r.lrem(queue, taskstr)
count += 1
return count

def get_pending_task_count():
"Get count of pending tasks"
r = get_redis_conn()
pending = 0
for queue in get_queues():
pending += r.llen(queue)
return pending

def get_timedout_locks():
"Get list of stale locks from all sites"
old_locks=[]
for site in frappe.cli.get_sites():
locksdir = os.path.join(site, 'locks')
for lock in os.listdir(locksdir):
lock_path = os.path.join(locksdir, lock)
try:
check_lock(lock_path)
except LockTimeoutError:
old_locks.append(lock_path)
return old_locks

def check_if_workers_online():
app = get_celery()
if app.control.inspect().ping():
return True
return False

def dump_queue_status():
"""
Dumps pending events and tasks per queue
"""
ret = []
r = get_redis_conn()
for queue in get_queues():
queue_details = {
'queue': queue,
'len': r.llen(queue),
}
queue_details.update(get_task_count_for_queue(queue))
ret.append(queue_details)

ret = sorted(ret, key=itemgetter('len'), reverse=True)
ret.insert(0, {
'total': get_pending_task_count()
})
return ret

def get_task_count_for_queue(queue):
"""
For a given queue, returns the count of every pending task and aggregate of
events pending
"""
r = get_redis_conn()
tasks = [get_task_body(taskstr) for taskstr in r.lrange(queue, 0, -1)]
task_names = [task['task'] for task in tasks]
task_counts = Counter(task_names)
event_counts = Counter(task['kwargs'].get('event') for task in tasks)
return {
'task_counts': task_counts,
'event_counts': event_counts
}

def doctor():
"""
Prints diagnostic information for the scheduler
"""
flag = False
workers_online = check_if_workers_online()
pending_tasks = get_pending_task_count()
locks = get_timedout_locks()
print "Workers online:", workers_online
print "Pending tasks", pending_tasks
print "Timed out locks:"
print "\n".join(locks)
if (not workers_online) or (pending_tasks > 4000) or locks:
return 1
return True

Cargando…
Cancelar
Guardar