diff --git a/frappe/tasks.py b/frappe/tasks.py index 805d919782..66020ab297 100644 --- a/frappe/tasks.py +++ b/frappe/tasks.py @@ -4,7 +4,7 @@ from __future__ import unicode_literals import frappe from frappe.utils.scheduler import enqueue_events -from frappe.celery_app import get_celery, celery_task, task_logger, get_queue, LONGJOBS_PREFIX +from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX from frappe.cli import get_sites from frappe.utils.file_lock import delete_lock @@ -14,21 +14,20 @@ def sync_queues(): app = get_celery() shortjob_workers, longjob_workers = get_workers(app) - for site in get_sites(): - if shortjob_workers: - app.control.broadcast('add_consumer', arguments=get_queue(site), - reply=True, destination=shortjob_workers) - - if longjob_workers: - app.control.broadcast('add_consumer', arguments=get_queue(site, LONGJOBS_PREFIX), - reply=True, destination=longjob_workers) + if shortjob_workers: + for worker in shortjob_workers: + sync_worker(app, worker) + + if longjob_workers: + for worker in longjob_workers: + sync_worker(app, worker, prefix=LONGJOBS_PREFIX) def get_workers(app): longjob_workers = [] shortjob_workers = [] active_queues = app.control.inspect().active_queues() - for worker in active_queues.keys(): + for worker in active_queues: if worker.startswith(LONGJOBS_PREFIX): longjob_workers.append(worker) else: @@ -36,6 +35,32 @@ def get_workers(app): return shortjob_workers, longjob_workers +def sync_worker(app, worker, prefix=''): + active_queues = set(get_active_queues(app, worker)) + required_queues = set(get_required_queues(app, prefix=prefix)) + to_add = required_queues - active_queues + to_remove = active_queues - required_queues + for queue in to_add: + app.control.broadcast('add_consumer', arguments={ + 'queue': queue + }, reply=True, destination=[worker]) + for queue in to_remove: + app.control.broadcast('cancel_consumer', arguments={ + 'queue': queue + }, reply=True, destination=[worker]) + + +def get_active_queues(app, worker): + active_queues = app.control.inspect().active_queues() + return [queue['name'] for queue in active_queues[worker]] + +def get_required_queues(app, prefix=''): + ret = [] + for site in get_sites(): + ret.append('{}{}'.format(prefix, site)) + ret.append(app.conf['CELERY_DEFAULT_QUEUE']) + return ret + @celery_task() def scheduler_task(site, event, handler, now=False): from frappe.utils.scheduler import log