|
@@ -1,6 +1,6 @@ |
|
|
from __future__ import unicode_literals |
|
|
from __future__ import unicode_literals |
|
|
import json, base64, os |
|
|
import json, base64, os |
|
|
import frappe.cli |
|
|
|
|
|
|
|
|
import frappe.utils |
|
|
from frappe.celery_app import get_celery |
|
|
from frappe.celery_app import get_celery |
|
|
from frappe.utils.file_lock import check_lock, LockTimeoutError |
|
|
from frappe.utils.file_lock import check_lock, LockTimeoutError |
|
|
from collections import Counter |
|
|
from collections import Counter |
|
@@ -17,7 +17,7 @@ def get_queues(): |
|
|
"Returns the name of queues where frappe enqueues tasks as per the configuration" |
|
|
"Returns the name of queues where frappe enqueues tasks as per the configuration" |
|
|
queues = ["celery"] |
|
|
queues = ["celery"] |
|
|
if frappe.conf.celery_queue_per_site: |
|
|
if frappe.conf.celery_queue_per_site: |
|
|
for site in frappe.cli.get_sites(): |
|
|
|
|
|
|
|
|
for site in frappe.utils.get_sites(): |
|
|
queues.append(site) |
|
|
queues.append(site) |
|
|
queues.append('longjobs@' + site) |
|
|
queues.append('longjobs@' + site) |
|
|
return queues |
|
|
return queues |
|
@@ -36,7 +36,7 @@ def purge_pending_tasks(event='all'): |
|
|
count = 0 |
|
|
count = 0 |
|
|
|
|
|
|
|
|
for queue in get_queues(): |
|
|
for queue in get_queues(): |
|
|
for taskstr in r.lrange(queue, 0 , -1): |
|
|
|
|
|
|
|
|
for taskstr in r.lrange(queue, 0, -1): |
|
|
taskbody = get_task_body(taskstr) |
|
|
taskbody = get_task_body(taskstr) |
|
|
kwargs = taskbody.get('kwargs') |
|
|
kwargs = taskbody.get('kwargs') |
|
|
if kwargs and kwargs.get('handler') and kwargs.get('handler') in event_tasks: |
|
|
if kwargs and kwargs.get('handler') and kwargs.get('handler') in event_tasks: |
|
@@ -55,7 +55,7 @@ def get_pending_task_count(): |
|
|
def get_timedout_locks(): |
|
|
def get_timedout_locks(): |
|
|
"Get list of stale locks from all sites" |
|
|
"Get list of stale locks from all sites" |
|
|
old_locks=[] |
|
|
old_locks=[] |
|
|
for site in frappe.cli.get_sites(): |
|
|
|
|
|
|
|
|
for site in frappe.utils.get_sites(): |
|
|
locksdir = os.path.join(site, 'locks') |
|
|
locksdir = os.path.join(site, 'locks') |
|
|
for lock in os.listdir(locksdir): |
|
|
for lock in os.listdir(locksdir): |
|
|
lock_path = os.path.join(locksdir, lock) |
|
|
lock_path = os.path.join(locksdir, lock) |
|
|