From 04da07fe19147e66f01456bade77554d34c8cb3e Mon Sep 17 00:00:00 2001 From: Anand Doshi Date: Tue, 22 Dec 2015 14:29:36 +0530 Subject: [PATCH] [fix] always use per site queue, don't add queue when in maintenance mode --- frappe/celery_app.py | 12 +++++------- frappe/tasks.py | 18 ++++++++++++++++++ frappe/utils/doctor.py | 10 +++++----- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/frappe/celery_app.py b/frappe/celery_app.py index 45eb8a974f..08d29d4d07 100644 --- a/frappe/celery_app.py +++ b/frappe/celery_app.py @@ -49,8 +49,7 @@ def get_celery_app(): app.conf.CELERY_SEND_EVENTS = True app.conf.CELERY_SEND_TASK_SENT_EVENT = True - if conf.celery_queue_per_site: - app.conf.CELERY_ROUTES = (SiteRouter(), AsyncTaskRouter()) + app.conf.CELERY_ROUTES = (SiteRouter(), AsyncTaskRouter()) app.conf.CELERYBEAT_SCHEDULE = get_beat_schedule(conf) @@ -90,11 +89,10 @@ def get_beat_schedule(conf): }, } - if conf.celery_queue_per_site: - schedule['sync_queues'] = { - 'task': 'frappe.tasks.sync_queues', - 'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL) - } + schedule['sync_queues'] = { + 'task': 'frappe.tasks.sync_queues', + 'schedule': timedelta(seconds=conf.scheduler_interval or DEFAULT_SCHEDULER_INTERVAL) + } return schedule diff --git a/frappe/tasks.py b/frappe/tasks.py index a68d2d2a2a..d9903efc77 100644 --- a/frappe/tasks.py +++ b/frappe/tasks.py @@ -57,10 +57,15 @@ def sync_worker(app, worker, prefix=''): required_queues = set(get_required_queues(app, prefix=prefix)) to_add = required_queues - active_queues to_remove = active_queues - required_queues + for queue in to_add: + if is_site_in_maintenance_mode(queue, prefix): + continue + app.control.broadcast('add_consumer', arguments={ 'queue': queue }, reply=True, destination=[worker]) + for queue in to_remove: app.control.broadcast('cancel_consumer', arguments={ 'queue': queue @@ -79,6 +84,19 @@ def get_required_queues(app, prefix=''): ret.append(app.conf['CELERY_DEFAULT_QUEUE']) return ret +def is_site_in_maintenance_mode(queue, prefix): + # check if site is in maintenance mode + site = queue.replace(prefix, "") + try: + frappe.init(site=site) + if not frappe.local.conf.db_name or frappe.local.conf.maintenance_mode: + # don't add site if in maintenance mode + return True + finally: + frappe.destroy() + + return False + @celery_task() def scheduler_task(site, event, handler, now=False): traceback = "" diff --git a/frappe/utils/doctor.py b/frappe/utils/doctor.py index d443f3b3bf..4374d54240 100644 --- a/frappe/utils/doctor.py +++ b/frappe/utils/doctor.py @@ -16,11 +16,11 @@ def get_redis_conn(): def get_queues(site=None): "Returns the name of queues where frappe enqueues tasks as per the configuration" queues = ["celery"] - if frappe.conf.celery_queue_per_site: - sites = [site] if site else frappe.utils.get_sites() - for site in sites: - queues.append(site) - queues.append('longjobs@' + site) + sites = [site] if site else frappe.utils.get_sites() + for site in sites: + queues.append(site) + queues.append('longjobs@' + site) + return queues def get_task_body(taskstr):