From 830a925b8268d74bc6b41ba7112cab953bb3ded5 Mon Sep 17 00:00:00 2001 From: Sagar Vora Date: Sun, 3 Apr 2022 11:07:12 +0530 Subject: [PATCH] feat: allow enqueue at front of Redis Queue --- frappe/tests/test_background_jobs.py | 16 ++++++++++++++++ frappe/utils/background_jobs.py | 20 +++++++++++++++----- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/frappe/tests/test_background_jobs.py b/frappe/tests/test_background_jobs.py index 188f3e166f..75f6cc8fe3 100644 --- a/frappe/tests/test_background_jobs.py +++ b/frappe/tests/test_background_jobs.py @@ -28,6 +28,22 @@ class TestBackgroundJobs(unittest.TestCase): fail_registry = queue.failed_job_registry self.assertEqual(fail_registry.count, 0) + def test_enqueue_at_front(self): + kwargs = { + "method": "frappe.handler.ping", + "queue": "short", + } + + # give worker something to work on first so that get_position doesn't return None + frappe.enqueue(**kwargs) + + # test enqueue with at_front=True + low_priority_job = frappe.enqueue(**kwargs) + high_priority_job = frappe.enqueue(**kwargs, at_front=True) + + # lesser is earlier + self.assertTrue(high_priority_job.get_position() < low_priority_job.get_position()) + def fail_function(): return 1 / 0 diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 8c324753dd..600a1fd4a9 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -40,8 +40,19 @@ def get_queues_timeout(): redis_connection = None -def enqueue(method, queue='default', timeout=None, event=None, - is_async=True, job_name=None, now=False, enqueue_after_commit=False, **kwargs): +def enqueue( + method, + queue='default', + timeout=None, + event=None, + is_async=True, + job_name=None, + now=False, + enqueue_after_commit=False, + *, + at_front=False, + **kwargs +): ''' Enqueue method to be executed using a background worker @@ -87,9 +98,8 @@ def enqueue(method, queue='default', timeout=None, event=None, "queue_args":queue_args }) return frappe.flags.enqueue_after_commit - else: - return q.enqueue_call(execute_job, timeout=timeout, - kwargs=queue_args) + + return q.enqueue_call(execute_job, timeout=timeout, kwargs=queue_args, at_front=at_front) def enqueue_doc(doctype, name=None, method=None, queue='default', timeout=300, now=False, **kwargs):