|
|
@@ -5,7 +5,7 @@ from __future__ import unicode_literals |
|
|
|
import frappe |
|
|
|
from frappe.utils.scheduler import enqueue_events |
|
|
|
from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX, ASYNC_TASKS_PREFIX |
|
|
|
from frappe.utils import get_sites |
|
|
|
from frappe.utils import get_sites, touch_file |
|
|
|
from frappe.utils.error import make_error_snapshot |
|
|
|
from frappe.utils.file_lock import create_lock, delete_lock |
|
|
|
from frappe.handler import execute_cmd |
|
|
@@ -17,6 +17,7 @@ import time |
|
|
|
import json |
|
|
|
import os |
|
|
|
import MySQLdb |
|
|
|
from frappe.utils.file_lock import check_lock, LockTimeoutError |
|
|
|
|
|
|
|
@celery_task() |
|
|
|
def sync_queues(): |
|
|
@@ -135,7 +136,17 @@ def scheduler_task(site, event, handler, now=False): |
|
|
|
@celery_task() |
|
|
|
def enqueue_scheduler_events(): |
|
|
|
for site in get_sites(): |
|
|
|
enqueue_events_for_site.delay(site=site) |
|
|
|
enqueue_lock = os.path.join(site, 'locks', 'enqueue.lock') |
|
|
|
try: |
|
|
|
if check_lock(enqueue_lock, timeout=1800): |
|
|
|
continue |
|
|
|
|
|
|
|
touch_file(enqueue_lock) |
|
|
|
|
|
|
|
enqueue_events_for_site.delay(site=site) |
|
|
|
|
|
|
|
except LockTimeoutError: |
|
|
|
os.remove(enqueue_lock) |
|
|
|
|
|
|
|
@celery_task() |
|
|
|
def enqueue_events_for_site(site): |
|
|
@@ -149,6 +160,7 @@ def enqueue_events_for_site(site): |
|
|
|
task_logger.error('Exception in Enqueue Events for Site {0}'.format(site)) |
|
|
|
raise |
|
|
|
finally: |
|
|
|
delete_lock('enqueue') |
|
|
|
frappe.destroy() |
|
|
|
|
|
|
|
@celery_task() |
|
|
|