Procházet zdrojové kódy

Merge pull request #1546 from rmehta/celery-inspect-queue

[admin] added bench purge-pending-tasks, and queue inspection in bench dump-queue-status
version-14
Anand Doshi před 9 roky
rodič
revize
146f2eb35e
4 změnil soubory, kde provedl 55 přidání a 13 odebrání
  1. +2
    -0
      frappe/__init__.py
  2. +9
    -7
      frappe/commands.py
  3. +2
    -0
      frappe/exceptions.py
  4. +42
    -6
      frappe/utils/doctor.py

+ 2
- 0
frappe/__init__.py Zobrazit soubor

@@ -173,6 +173,8 @@ def get_site_config(sites_path=None, site_path=None):
site_config = os.path.join(site_path, "site_config.json") site_config = os.path.join(site_path, "site_config.json")
if os.path.exists(site_config): if os.path.exists(site_config):
config.update(get_file_json(site_config)) config.update(get_file_json(site_config))
elif local.site:
raise IncorrectSitePath, "{0} does not exist".format(site_config)


return _dict(config) return _dict(config)




+ 9
- 7
frappe/commands.py Zobrazit soubor

@@ -828,7 +828,6 @@ def request(context, args):
def doctor(): def doctor():
"Get diagnostic info about background workers" "Get diagnostic info about background workers"
from frappe.utils.doctor import doctor as _doctor from frappe.utils.doctor import doctor as _doctor
frappe.init('')
return _doctor() return _doctor()


@click.command('celery-doctor') @click.command('celery-doctor')
@@ -839,20 +838,23 @@ def celery_doctor(site=None):
frappe.init('') frappe.init('')
return _celery_doctor(site=site) return _celery_doctor(site=site)


@click.command('purge-all-tasks')
def purge_all_tasks():
"Purge any pending periodic tasks of 'all' event. Doesn't purge hourly, daily and weekly"
frappe.init('')
@click.command('purge-pending-tasks')
@click.option('--site', help='site name')
@click.option('--event', default=None, help='one of "all", "weekly", "monthly", "hourly", "daily", "weekly_long", "daily_long"')
def purge_all_tasks(site=None, event=None):
"Purge any pending periodic tasks, if event option is not given, it will purge everything for the site"
from frappe.utils.doctor import purge_pending_tasks from frappe.utils.doctor import purge_pending_tasks
count = purge_pending_tasks()
frappe.init(site or '')
count = purge_pending_tasks(event=None, site=None)
print "Purged {} tasks".format(count) print "Purged {} tasks".format(count)


@click.command('dump-queue-status') @click.command('dump-queue-status')
def dump_queue_status(): def dump_queue_status():
"Dump detailed diagnostic infomation for task queues in JSON format" "Dump detailed diagnostic infomation for task queues in JSON format"
frappe.init('') frappe.init('')
from frappe.utils.doctor import dump_queue_status as _dump_queue_status
from frappe.utils.doctor import dump_queue_status as _dump_queue_status, inspect_queue
print json.dumps(_dump_queue_status(), indent=1) print json.dumps(_dump_queue_status(), indent=1)
print inspect_queue


@click.command('make-app') @click.command('make-app')
@click.argument('destination') @click.argument('destination')


+ 2
- 0
frappe/exceptions.py Zobrazit soubor

@@ -59,3 +59,5 @@ class InvalidEmailAddressError(ValidationError): pass
class TemplateNotFoundError(ValidationError): pass class TemplateNotFoundError(ValidationError): pass
class UniqueValidationError(ValidationError): pass class UniqueValidationError(ValidationError): pass
class AppNotInstalledError(ValidationError): pass class AppNotInstalledError(ValidationError): pass
class IncorrectSitePath(ValidationError): pass


+ 42
- 6
frappe/utils/doctor.py Zobrazit soubor

@@ -3,6 +3,7 @@ import json, base64, os
import frappe.utils import frappe.utils
from frappe.celery_app import get_celery from frappe.celery_app import get_celery
from frappe.utils.file_lock import check_lock, LockTimeoutError from frappe.utils.file_lock import check_lock, LockTimeoutError
from frappe.utils.scheduler import is_scheduler_disabled
from collections import Counter from collections import Counter
from operator import itemgetter from operator import itemgetter


@@ -26,23 +27,32 @@ def get_queues(site=None):
def get_task_body(taskstr): def get_task_body(taskstr):
return json.loads(base64.decodestring(json.loads(taskstr)['body'])) return json.loads(base64.decodestring(json.loads(taskstr)['body']))


def purge_pending_tasks(event='all'):
def purge_pending_tasks(event=None, site=None):
""" """
Purge tasks of the event event type. Passing 'all' will not purge all Purge tasks of the event event type. Passing 'all' will not purge all
events but of the all event type, ie. the ones that are enqueued every five events but of the all event type, ie. the ones that are enqueued every five
mintues and would any leave daily, hourly and weekly tasks mintues and would any leave daily, hourly and weekly tasks
""" """
r = get_redis_conn() r = get_redis_conn()
event_tasks = frappe.get_hooks()['scheduler_events'][event]
if event:
event_tasks = frappe.get_hooks()['scheduler_events'][event]
count = 0 count = 0


for queue in get_queues(): for queue in get_queues():
for taskstr in r.lrange(queue, 0, -1): for taskstr in r.lrange(queue, 0, -1):
taskbody = get_task_body(taskstr) taskbody = get_task_body(taskstr)
kwargs = taskbody.get('kwargs') kwargs = taskbody.get('kwargs')
if kwargs and kwargs.get('handler') and kwargs.get('handler') in event_tasks:
r.lrem(queue, taskstr)
count += 1
if kwargs:
if site and kwargs.get('site') != site:
continue

if event:
if kwargs.get('handler') and kwargs.get('handler') in event_tasks:
r.lrem(queue, taskstr)
count += 1
else:
r.lrem(queue, taskstr)
count += 1
return count return count


def get_pending_task_count(): def get_pending_task_count():
@@ -131,16 +141,30 @@ def doctor():
""" """
Prints diagnostic information for the scheduler Prints diagnostic information for the scheduler
""" """
flag = False
print "Inspecting workers and queues..."
workers_online = check_if_workers_online() workers_online = check_if_workers_online()
pending_tasks = get_pending_task_count() pending_tasks = get_pending_task_count()

print "Finding locks..."
locks = get_timedout_locks() locks = get_timedout_locks()

print "Checking scheduler status..."
for site in frappe.utils.get_sites():
frappe.init(site)
frappe.connect()
if not is_scheduler_disabled():
print "{0:40}: Scheduler disabled via System Settings or site_config.json".format(site)
frappe.destroy()

print "Workers online:", workers_online print "Workers online:", workers_online
print "Pending tasks", pending_tasks print "Pending tasks", pending_tasks
print "Timed out locks:" print "Timed out locks:"
print "\n".join(locks) print "\n".join(locks)
if (not workers_online) or (pending_tasks > 4000) or locks: if (not workers_online) or (pending_tasks > 4000) or locks:
return 1 return 1

print "Note: To view pending tasks, use bench dump-queue-status"

return True return True


def celery_doctor(site=None): def celery_doctor(site=None):
@@ -149,6 +173,18 @@ def celery_doctor(site=None):
print 'Queue Status' print 'Queue Status'
print '------------' print '------------'
print json.dumps(queues, indent=1) print json.dumps(queues, indent=1)
print ''
print 'Running Tasks' print 'Running Tasks'
print '------------' print '------------'
print json.dumps(running_tasks, indent=1) print json.dumps(running_tasks, indent=1)

def inspect_queue():
print 'Pending Tasks Queue'
print '-'*20
r = get_redis_conn()
for queue in get_queues():
for taskstr in r.lrange(queue, 0, -1):
taskbody = get_task_body(taskstr)
kwargs = taskbody.get('kwargs')
if kwargs:
print frappe.as_json(kwargs)

Načítá se…
Zrušit
Uložit