瀏覽代碼

Merge pull request #1847 from anandpdoshi/enqueue-lock

[fix] create enqueue.lock before queuing enqueue_events_for_site to prevent duplicate enqueue events
version-14
Anand Doshi 9 年之前
committed by GitHub
父節點
當前提交
2decd62dd5
共有 2 個檔案被更改,包括 17 行新增5 行删除
  1. +14
    -2
      frappe/tasks.py
  2. +3
    -3
      frappe/utils/file_lock.py

+ 14
- 2
frappe/tasks.py 查看文件

@@ -5,7 +5,7 @@ from __future__ import unicode_literals
import frappe import frappe
from frappe.utils.scheduler import enqueue_events from frappe.utils.scheduler import enqueue_events
from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX, ASYNC_TASKS_PREFIX from frappe.celery_app import get_celery, celery_task, task_logger, LONGJOBS_PREFIX, ASYNC_TASKS_PREFIX
from frappe.utils import get_sites
from frappe.utils import get_sites, touch_file
from frappe.utils.error import make_error_snapshot from frappe.utils.error import make_error_snapshot
from frappe.utils.file_lock import create_lock, delete_lock from frappe.utils.file_lock import create_lock, delete_lock
from frappe.handler import execute_cmd from frappe.handler import execute_cmd
@@ -17,6 +17,7 @@ import time
import json import json
import os import os
import MySQLdb import MySQLdb
from frappe.utils.file_lock import check_lock, LockTimeoutError


@celery_task() @celery_task()
def sync_queues(): def sync_queues():
@@ -135,7 +136,17 @@ def scheduler_task(site, event, handler, now=False):
@celery_task() @celery_task()
def enqueue_scheduler_events(): def enqueue_scheduler_events():
for site in get_sites(): for site in get_sites():
enqueue_events_for_site.delay(site=site)
enqueue_lock = os.path.join(site, 'locks', 'enqueue.lock')
try:
if check_lock(enqueue_lock, timeout=1800):
continue

touch_file(enqueue_lock)

enqueue_events_for_site.delay(site=site)

except LockTimeoutError:
os.remove(enqueue_lock)


@celery_task() @celery_task()
def enqueue_events_for_site(site): def enqueue_events_for_site(site):
@@ -149,6 +160,7 @@ def enqueue_events_for_site(site):
task_logger.error('Exception in Enqueue Events for Site {0}'.format(site)) task_logger.error('Exception in Enqueue Events for Site {0}'.format(site))
raise raise
finally: finally:
delete_lock('enqueue')
frappe.destroy() frappe.destroy()


@celery_task() @celery_task()


+ 3
- 3
frappe/utils/file_lock.py 查看文件

@@ -1,5 +1,5 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
# MIT License. See license.txt


from __future__ import unicode_literals from __future__ import unicode_literals


@@ -18,10 +18,10 @@ def create_lock(name):
else: else:
return False return False


def check_lock(path):
def check_lock(path, timeout=600):
if not os.path.exists(path): if not os.path.exists(path):
return False return False
if time() - os.path.getmtime(path) > 600:
if time() - os.path.getmtime(path) > timeout:
raise LockTimeoutError(path) raise LockTimeoutError(path)
return True return True




Loading…
取消
儲存