|
|
@@ -4,7 +4,8 @@ from rq import Connection, Queue, Worker |
|
|
|
from frappe.utils import cstr |
|
|
|
from collections import defaultdict |
|
|
|
import frappe |
|
|
|
import os, socket |
|
|
|
import MySQLdb |
|
|
|
import os, socket, time |
|
|
|
|
|
|
|
default_timeout = 300 |
|
|
|
queue_timeout = { |
|
|
@@ -26,7 +27,7 @@ def enqueue(method, queue='default', timeout=300, event=None, |
|
|
|
:param job_name: can be used to name an enqueue call, which can be used to prevent duplicate calls |
|
|
|
:param kwargs: keyword arguments to be passed to the method |
|
|
|
''' |
|
|
|
q = get_queue(queue) |
|
|
|
q = get_queue(queue, async=async) |
|
|
|
if not timeout: |
|
|
|
timeout = queue_timeout.get(queue) or 300 |
|
|
|
|
|
|
@@ -36,30 +37,56 @@ def enqueue(method, queue='default', timeout=300, event=None, |
|
|
|
"method": method, |
|
|
|
"event": event, |
|
|
|
"job_name": job_name or cstr(method), |
|
|
|
"kwargs":kwargs |
|
|
|
"async": async, |
|
|
|
"kwargs": kwargs |
|
|
|
}) |
|
|
|
|
|
|
|
def execute_job(site, method, event, job_name, kwargs): |
|
|
|
def execute_job(site, method, event, job_name, kwargs, async=True, retry=0): |
|
|
|
'''Executes job in a worker, performs commit/rollback and logs if there is any error''' |
|
|
|
from frappe.utils.scheduler import log |
|
|
|
frappe.connect(site) |
|
|
|
|
|
|
|
if async: |
|
|
|
frappe.connect(site) |
|
|
|
|
|
|
|
if isinstance(method, basestring): |
|
|
|
method_name = method |
|
|
|
method = frappe.get_attr(method) |
|
|
|
else: |
|
|
|
method_name = cstr(method) |
|
|
|
method_name = cstr(method.__name__) |
|
|
|
|
|
|
|
try: |
|
|
|
method(**kwargs) |
|
|
|
|
|
|
|
except (MySQLdb.OperationalError, frappe.RetryBackgroundJobError), e: |
|
|
|
frappe.db.rollback() |
|
|
|
|
|
|
|
if (retry < 5 and |
|
|
|
(isinstance(e, frappe.RetryBackgroundJobError) or e.args[0] in (1213, 1205))): |
|
|
|
# retry the job if |
|
|
|
# 1213 = deadlock |
|
|
|
# 1205 = lock wait timeout |
|
|
|
# or RetryBackgroundJobError is explicitly raised |
|
|
|
frappe.destroy() |
|
|
|
time.sleep(retry+1) |
|
|
|
|
|
|
|
return execute_job(site, method, event, job_name, kwargs, |
|
|
|
async=async, retry=retry+1) |
|
|
|
|
|
|
|
else: |
|
|
|
log(method_name, message=repr(locals())) |
|
|
|
raise |
|
|
|
|
|
|
|
except: |
|
|
|
frappe.db.rollback() |
|
|
|
log(method_name) |
|
|
|
log(method_name, message=repr(locals())) |
|
|
|
raise |
|
|
|
|
|
|
|
else: |
|
|
|
frappe.db.commit() |
|
|
|
|
|
|
|
finally: |
|
|
|
frappe.destroy() |
|
|
|
if async: |
|
|
|
frappe.destroy() |
|
|
|
|
|
|
|
def start_worker(queue=None): |
|
|
|
'''Wrapper to start rq worker. Connects to redis and monitors these queues.''' |
|
|
@@ -120,11 +147,11 @@ def get_queue_list(queue_list=None): |
|
|
|
else: |
|
|
|
return default_queue_list |
|
|
|
|
|
|
|
def get_queue(queue): |
|
|
|
def get_queue(queue, async=True): |
|
|
|
'''Returns a Queue object tied to a redis connection''' |
|
|
|
validate_queue(queue) |
|
|
|
|
|
|
|
return Queue(queue, connection=get_redis_conn()) |
|
|
|
return Queue(queue, connection=get_redis_conn(), async=async) |
|
|
|
|
|
|
|
def validate_queue(queue, default_queue_list=None): |
|
|
|
if not default_queue_list: |
|
|
|