* fix: RQ reduce failed job TTL from 1 year to 7 days (cherry picked from commitversion-145152f13b10
) * feat: RQ worker virtual doctype Enuff manual JS and HTML crap, lets reuse list and form views! (cherry picked from commit330bd08210
) * feat: RQ job doctype (cherry picked from commitfc0ff7bd2e
) * feat: delete background jobs from form views (cherry picked from commitdca6592b7e
) * fix: disable count and stats (cherry picked from commitc021b07255
) * refactor: remove dead code related to scheduler activation (cherry picked from commit7a3c3910fe
) * fix: serialize function objects to their repr (cherry picked from commit6d264b149a
) * feat: show scheduler status on RQ job page (cherry picked from commit46df97c3f2
) * feat: auto refresh RQ job page (cherry picked from commit4ae85b20db
) * test: add tests for bg job monitoring code (cherry picked from commit2267d40420
) * feat: kill a running background job -f Also refactor permission checks (cherry picked from commit11936a76df
) Co-authored-by: Ankush Menat <ankush@frappe.io>
@@ -1198,6 +1198,9 @@ def validate_fields(meta): | |||||
frappe.throw(_("Precision should be between 1 and 6")) | frappe.throw(_("Precision should be between 1 and 6")) | ||||
def check_unique_and_text(docname, d): | def check_unique_and_text(docname, d): | ||||
if meta.is_virtual: | |||||
return | |||||
if meta.issingle: | if meta.issingle: | ||||
d.unique = 0 | d.unique = 0 | ||||
d.search_index = 0 | d.search_index = 0 | ||||
@@ -0,0 +1,30 @@ | |||||
// Copyright (c) 2022, Frappe Technologies and contributors | |||||
// For license information, please see license.txt | |||||
frappe.ui.form.on("RQ Job", { | |||||
refresh: function (frm) { | |||||
// Nothing in this form is supposed to be editable. | |||||
frm.disable_form(); | |||||
frm.dashboard.set_headline_alert( | |||||
"This is a virtual doctype and data is cleared periodically." | |||||
); | |||||
if (["started", "queued"].includes(frm.doc.status)) { | |||||
frm.add_custom_button(__("Force Stop job"), () => { | |||||
frappe.confirm( | |||||
"This will terminate the job immediately and might be dangerous, are you sure? ", | |||||
() => { | |||||
frappe | |||||
.xcall("frappe.core.doctype.rq_job.rq_job.stop_job", { | |||||
job_id: frm.doc.name, | |||||
}) | |||||
.then((r) => { | |||||
frappe.show_alert("Job Stopped Succefully"); | |||||
frm.reload_doc(); | |||||
}); | |||||
} | |||||
); | |||||
}); | |||||
} | |||||
}, | |||||
}); |
@@ -0,0 +1,162 @@ | |||||
{ | |||||
"actions": [], | |||||
"allow_copy": 1, | |||||
"autoname": "field:job_id", | |||||
"creation": "2022-09-10 16:19:37.934903", | |||||
"doctype": "DocType", | |||||
"editable_grid": 1, | |||||
"engine": "InnoDB", | |||||
"field_order": [ | |||||
"job_info_section", | |||||
"job_id", | |||||
"job_name", | |||||
"queue", | |||||
"timeout", | |||||
"column_break_5", | |||||
"arguments", | |||||
"job_status_section", | |||||
"status", | |||||
"time_taken", | |||||
"column_break_11", | |||||
"started_at", | |||||
"ended_at", | |||||
"exception_section", | |||||
"exc_info" | |||||
], | |||||
"fields": [ | |||||
{ | |||||
"fieldname": "queue", | |||||
"fieldtype": "Select", | |||||
"in_list_view": 1, | |||||
"in_standard_filter": 1, | |||||
"label": "Queue", | |||||
"options": "default\nshort\nlong" | |||||
}, | |||||
{ | |||||
"fieldname": "status", | |||||
"fieldtype": "Select", | |||||
"in_list_view": 1, | |||||
"in_standard_filter": 1, | |||||
"label": "Status", | |||||
"options": "queued\nstarted\nfinished\nfailed\ndeferred\nscheduled\ncanceled" | |||||
}, | |||||
{ | |||||
"fieldname": "job_id", | |||||
"fieldtype": "Data", | |||||
"label": "Job ID", | |||||
"unique": 1 | |||||
}, | |||||
{ | |||||
"fieldname": "exc_info", | |||||
"fieldtype": "Code", | |||||
"label": "Exception" | |||||
}, | |||||
{ | |||||
"fieldname": "job_name", | |||||
"fieldtype": "Data", | |||||
"label": "Job Name" | |||||
}, | |||||
{ | |||||
"fieldname": "arguments", | |||||
"fieldtype": "Code", | |||||
"label": "Arguments" | |||||
}, | |||||
{ | |||||
"fieldname": "timeout", | |||||
"fieldtype": "Duration", | |||||
"label": "Timeout" | |||||
}, | |||||
{ | |||||
"fieldname": "time_taken", | |||||
"fieldtype": "Duration", | |||||
"label": "Time Taken" | |||||
}, | |||||
{ | |||||
"fieldname": "started_at", | |||||
"fieldtype": "Datetime", | |||||
"label": "Started At" | |||||
}, | |||||
{ | |||||
"fieldname": "ended_at", | |||||
"fieldtype": "Datetime", | |||||
"label": "Ended At" | |||||
}, | |||||
{ | |||||
"fieldname": "job_info_section", | |||||
"fieldtype": "Section Break", | |||||
"label": "Job Info" | |||||
}, | |||||
{ | |||||
"fieldname": "job_status_section", | |||||
"fieldtype": "Section Break", | |||||
"label": "Job Status" | |||||
}, | |||||
{ | |||||
"fieldname": "column_break_5", | |||||
"fieldtype": "Column Break" | |||||
}, | |||||
{ | |||||
"fieldname": "column_break_11", | |||||
"fieldtype": "Column Break" | |||||
}, | |||||
{ | |||||
"fieldname": "exception_section", | |||||
"fieldtype": "Section Break" | |||||
} | |||||
], | |||||
"in_create": 1, | |||||
"is_virtual": 1, | |||||
"links": [], | |||||
"modified": "2022-09-11 05:27:50.878534", | |||||
"modified_by": "Administrator", | |||||
"module": "Core", | |||||
"name": "RQ Job", | |||||
"naming_rule": "By fieldname", | |||||
"owner": "Administrator", | |||||
"permissions": [ | |||||
{ | |||||
"email": 1, | |||||
"export": 1, | |||||
"print": 1, | |||||
"read": 1, | |||||
"report": 1, | |||||
"role": "System Manager", | |||||
"share": 1 | |||||
}, | |||||
{ | |||||
"delete": 1, | |||||
"email": 1, | |||||
"export": 1, | |||||
"print": 1, | |||||
"read": 1, | |||||
"report": 1, | |||||
"role": "Administrator", | |||||
"share": 1 | |||||
} | |||||
], | |||||
"sort_field": "modified", | |||||
"sort_order": "DESC", | |||||
"states": [ | |||||
{ | |||||
"color": "Yellow", | |||||
"title": "queued" | |||||
}, | |||||
{ | |||||
"color": "Blue", | |||||
"title": "started" | |||||
}, | |||||
{ | |||||
"color": "Red", | |||||
"title": "failed" | |||||
}, | |||||
{ | |||||
"color": "Green", | |||||
"title": "finished" | |||||
}, | |||||
{ | |||||
"color": "Orange", | |||||
"title": "cancelled" | |||||
} | |||||
], | |||||
"title_field": "job_name" | |||||
} |
@@ -0,0 +1,186 @@ | |||||
# Copyright (c) 2022, Frappe Technologies and contributors | |||||
# For license information, please see license.txt | |||||
import functools | |||||
from rq.command import send_stop_job_command | |||||
from rq.job import Job | |||||
from rq.queue import Queue | |||||
import frappe | |||||
from frappe.model.document import Document | |||||
from frappe.utils import ( | |||||
cint, | |||||
compare, | |||||
convert_utc_to_user_timezone, | |||||
create_batch, | |||||
make_filter_dict, | |||||
) | |||||
from frappe.utils.background_jobs import get_queues, get_redis_conn | |||||
QUEUES = ["default", "long", "short"] | |||||
JOB_STATUSES = ["queued", "started", "failed", "finished", "deferred", "scheduled", "canceled"] | |||||
def check_permissions(method): | |||||
@functools.wraps(method) | |||||
def wrapper(*args, **kwargs): | |||||
frappe.only_for("System Manager") | |||||
job = args[0].job | |||||
if not for_current_site(job): | |||||
raise frappe.PermissionError | |||||
return method(*args, **kwargs) | |||||
return wrapper | |||||
class RQJob(Document): | |||||
def load_from_db(self): | |||||
job = Job.fetch(self.name, connection=get_redis_conn()) | |||||
if not for_current_site(job): | |||||
raise frappe.PermissionError | |||||
super(Document, self).__init__(serialize_job(job)) | |||||
self._job_obj = job | |||||
@property | |||||
def job(self): | |||||
return self._job_obj | |||||
@staticmethod | |||||
def get_list(args): | |||||
start = cint(args.get("start")) or 0 | |||||
page_length = cint(args.get("page_length")) or 20 | |||||
order_desc = "desc" in args.get("order_by", "") | |||||
matched_job_ids = RQJob.get_matching_job_ids(args) | |||||
jobs = [] | |||||
for job_ids in create_batch(matched_job_ids, 100): | |||||
jobs.extend( | |||||
serialize_job(job) | |||||
for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()) | |||||
if job and for_current_site(job) | |||||
) | |||||
if len(jobs) > start + page_length: | |||||
# we have fetched enough. This is inefficient but because of site filtering TINA | |||||
break | |||||
return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)[start : start + page_length] | |||||
@staticmethod | |||||
def get_matching_job_ids(args): | |||||
filters = make_filter_dict(args.get("filters")) | |||||
queues = _eval_filters(filters.get("queue"), QUEUES) | |||||
statuses = _eval_filters(filters.get("status"), JOB_STATUSES) | |||||
matched_job_ids = [] | |||||
for queue in get_queues(): | |||||
if not queue.name.endswith(tuple(queues)): | |||||
continue | |||||
for status in statuses: | |||||
matched_job_ids.extend(fetch_job_ids(queue, status)) | |||||
return matched_job_ids | |||||
@check_permissions | |||||
def delete(self): | |||||
self.job.delete() | |||||
@check_permissions | |||||
def stop_job(self): | |||||
send_stop_job_command(connection=get_redis_conn(), job_id=self.job_id) | |||||
@staticmethod | |||||
def get_count(args) -> int: | |||||
# Can not be implemented efficiently due to site filtering hence ignored. | |||||
return 0 | |||||
# None of these methods apply to virtual job doctype, overriden for sanity. | |||||
@staticmethod | |||||
def get_stats(args): | |||||
return {} | |||||
def db_insert(self, *args, **kwargs): | |||||
pass | |||||
def db_update(self, *args, **kwargs): | |||||
pass | |||||
def serialize_job(job: Job) -> frappe._dict: | |||||
modified = job.last_heartbeat or job.ended_at or job.started_at or job.created_at | |||||
return frappe._dict( | |||||
name=job.id, | |||||
job_id=job.id, | |||||
queue=job.origin.rsplit(":", 1)[1], | |||||
job_name=job.kwargs.get("kwargs", {}).get("job_type") or str(job.kwargs.get("job_name")), | |||||
status=job.get_status(), | |||||
started_at=convert_utc_to_user_timezone(job.started_at) if job.started_at else "", | |||||
ended_at=convert_utc_to_user_timezone(job.ended_at) if job.ended_at else "", | |||||
time_taken=(job.ended_at - job.started_at).total_seconds() if job.ended_at else "", | |||||
exc_info=job.exc_info, | |||||
arguments=frappe.as_json(job.kwargs), | |||||
timeout=job.timeout, | |||||
creation=convert_utc_to_user_timezone(job.created_at), | |||||
modified=convert_utc_to_user_timezone(modified), | |||||
_comment_count=0, | |||||
) | |||||
def for_current_site(job: Job) -> bool: | |||||
return job.kwargs.get("site") == frappe.local.site | |||||
def _eval_filters(filter, values: list[str]) -> list[str]: | |||||
if filter: | |||||
operator, operand = filter | |||||
return [val for val in values if compare(val, operator, operand)] | |||||
return values | |||||
def fetch_job_ids(queue: Queue, status: str) -> list[str]: | |||||
registry_map = { | |||||
"queued": queue, # self | |||||
"started": queue.started_job_registry, | |||||
"finished": queue.finished_job_registry, | |||||
"failed": queue.failed_job_registry, | |||||
"deferred": queue.deferred_job_registry, | |||||
"scheduled": queue.scheduled_job_registry, | |||||
"canceled": queue.canceled_job_registry, | |||||
} | |||||
registry = registry_map.get(status) | |||||
if registry is not None: | |||||
job_ids = registry.get_job_ids() | |||||
return [j for j in job_ids if j] | |||||
return [] | |||||
@frappe.whitelist() | |||||
def remove_failed_jobs(): | |||||
frappe.only_for("System Manager") | |||||
for queue in get_queues(): | |||||
fail_registry = queue.failed_job_registry | |||||
for job_ids in create_batch(fail_registry.get_job_ids(), 100): | |||||
for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()): | |||||
if job and for_current_site(job): | |||||
fail_registry.remove(job, delete_job=True) | |||||
def get_all_queued_jobs(): | |||||
jobs = [] | |||||
for q in get_queues(): | |||||
jobs.extend(q.get_jobs()) | |||||
return [job for job in jobs if for_current_site(job)] | |||||
@frappe.whitelist() | |||||
def stop_job(job_id): | |||||
frappe.get_doc("RQ Job", job_id).stop_job() |
@@ -0,0 +1,32 @@ | |||||
frappe.listview_settings["RQ Job"] = { | |||||
hide_name_column: true, | |||||
onload(listview) { | |||||
if (!has_common(frappe.user_roles, ["Administrator", "System Manager"])) return; | |||||
listview.page.add_inner_button(__("Remove Failed Jobs"), () => { | |||||
frappe.confirm(__("Are you sure you want to remove all failed jobs?"), () => { | |||||
frappe.xcall("frappe.core.doctype.rq_job.rq_job.remove_failed_jobs"); | |||||
}); | |||||
}); | |||||
if (listview.list_view_settings) { | |||||
listview.list_view_settings.disable_count = 1; | |||||
listview.list_view_settings.disable_sidebar_stats = 1; | |||||
} | |||||
frappe.xcall("frappe.utils.scheduler.get_scheduler_status").then(({ status }) => { | |||||
if (status === "active") { | |||||
listview.page.set_indicator(__("Scheduler: Active"), "green"); | |||||
} else { | |||||
listview.page.set_indicator(__("Scheduler: Inactive"), "red"); | |||||
} | |||||
}); | |||||
setInterval(() => { | |||||
if (!listview.list_view_settings.disable_auto_refresh) { | |||||
listview.refresh(); | |||||
} | |||||
}, 5000); | |||||
}, | |||||
}; |
@@ -0,0 +1,88 @@ | |||||
# Copyright (c) 2022, Frappe Technologies and Contributors | |||||
# See license.txt | |||||
import time | |||||
from rq import exceptions as rq_exc | |||||
from rq.job import Job | |||||
import frappe | |||||
from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs, stop_job | |||||
from frappe.tests.utils import FrappeTestCase, timeout | |||||
class TestRQJob(FrappeTestCase): | |||||
BG_JOB = "frappe.core.doctype.rq_job.test_rq_job.test_func" | |||||
@timeout(seconds=20) | |||||
def check_status(self, job: Job, status, wait=True): | |||||
if wait: | |||||
while True: | |||||
if job.is_queued or job.is_started: | |||||
time.sleep(0.2) | |||||
else: | |||||
break | |||||
self.assertEqual(frappe.get_doc("RQ Job", job.id).status, status) | |||||
def test_serialization(self): | |||||
job = frappe.enqueue(method=self.BG_JOB, queue="short") | |||||
rq_job = frappe.get_doc("RQ Job", job.id) | |||||
self.assertEqual(job, rq_job.job) | |||||
self.assertDocumentEqual( | |||||
{ | |||||
"name": job.id, | |||||
"queue": "short", | |||||
"job_name": self.BG_JOB, | |||||
"status": "queued", | |||||
"exc_info": None, | |||||
}, | |||||
rq_job, | |||||
) | |||||
self.check_status(job, "finished") | |||||
def test_get_list_filtering(self): | |||||
# Check failed job clearning and filtering | |||||
remove_failed_jobs() | |||||
jobs = RQJob.get_list({"filters": [["RQ Job", "status", "=", "failed"]]}) | |||||
self.assertEqual(jobs, []) | |||||
# Fail a job | |||||
job = frappe.enqueue(method=self.BG_JOB, queue="short", fail=True) | |||||
self.check_status(job, "failed") | |||||
jobs = RQJob.get_list({"filters": [["RQ Job", "status", "=", "failed"]]}) | |||||
self.assertEqual(len(jobs), 1) | |||||
self.assertTrue(jobs[0].exc_info) | |||||
# Assert that non-failed job still exists | |||||
non_failed_jobs = RQJob.get_list({"filters": [["RQ Job", "status", "!=", "failed"]]}) | |||||
self.assertGreaterEqual(len(non_failed_jobs), 1) | |||||
# Create a slow job and check if it's stuck in "Started" | |||||
job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=1000) | |||||
time.sleep(3) | |||||
self.check_status(job, "started", wait=False) | |||||
stop_job(job_id=job.id) | |||||
self.check_status(job, "stopped") | |||||
def test_delete_doc(self): | |||||
job = frappe.enqueue(method=self.BG_JOB, queue="short") | |||||
frappe.get_doc("RQ Job", job.id).delete() | |||||
with self.assertRaises(rq_exc.NoSuchJobError): | |||||
job.refresh() | |||||
def test_func(fail=False, sleep=0): | |||||
if fail: | |||||
42 / 0 | |||||
if sleep: | |||||
time.sleep(sleep) | |||||
return True |
@@ -0,0 +1,9 @@ | |||||
// Copyright (c) 2022, Frappe Technologies and contributors | |||||
// For license information, please see license.txt | |||||
frappe.ui.form.on("RQ Worker", { | |||||
refresh: function (frm) { | |||||
// Nothing in this form is supposed to be editable. | |||||
frm.disable_form(); | |||||
}, | |||||
}); |
@@ -0,0 +1,138 @@ | |||||
{ | |||||
"actions": [], | |||||
"allow_copy": 1, | |||||
"creation": "2022-09-10 14:54:57.342170", | |||||
"doctype": "DocType", | |||||
"editable_grid": 1, | |||||
"engine": "InnoDB", | |||||
"field_order": [ | |||||
"worker_information_section", | |||||
"queue", | |||||
"queue_type", | |||||
"column_break_4", | |||||
"worker_name", | |||||
"statistics_section", | |||||
"status", | |||||
"pid", | |||||
"current_job_id", | |||||
"successful_job_count", | |||||
"failed_job_count", | |||||
"column_break_12", | |||||
"birth_date", | |||||
"last_heartbeat", | |||||
"total_working_time" | |||||
], | |||||
"fields": [ | |||||
{ | |||||
"fieldname": "worker_name", | |||||
"fieldtype": "Data", | |||||
"label": "Worker Name", | |||||
"unique": 1 | |||||
}, | |||||
{ | |||||
"fieldname": "status", | |||||
"fieldtype": "Data", | |||||
"in_list_view": 1, | |||||
"label": "Status" | |||||
}, | |||||
{ | |||||
"fieldname": "current_job_id", | |||||
"fieldtype": "Link", | |||||
"label": "Current Job ID", | |||||
"options": "RQ Job" | |||||
}, | |||||
{ | |||||
"fieldname": "pid", | |||||
"fieldtype": "Data", | |||||
"label": "PID" | |||||
}, | |||||
{ | |||||
"fieldname": "last_heartbeat", | |||||
"fieldtype": "Datetime", | |||||
"label": "Last Heartbeat" | |||||
}, | |||||
{ | |||||
"fieldname": "birth_date", | |||||
"fieldtype": "Datetime", | |||||
"label": "Start Time" | |||||
}, | |||||
{ | |||||
"fieldname": "successful_job_count", | |||||
"fieldtype": "Int", | |||||
"in_list_view": 1, | |||||
"label": "Successful Job Count" | |||||
}, | |||||
{ | |||||
"fieldname": "failed_job_count", | |||||
"fieldtype": "Int", | |||||
"in_list_view": 1, | |||||
"label": "Failed Job Count" | |||||
}, | |||||
{ | |||||
"fieldname": "total_working_time", | |||||
"fieldtype": "Duration", | |||||
"label": "Total Working Time" | |||||
}, | |||||
{ | |||||
"fieldname": "queue", | |||||
"fieldtype": "Data", | |||||
"label": "Queue" | |||||
}, | |||||
{ | |||||
"fieldname": "queue_type", | |||||
"fieldtype": "Select", | |||||
"in_list_view": 1, | |||||
"label": "Queue Type", | |||||
"options": "default\nlong\nshort" | |||||
}, | |||||
{ | |||||
"fieldname": "worker_information_section", | |||||
"fieldtype": "Section Break", | |||||
"label": "Worker Information" | |||||
}, | |||||
{ | |||||
"fieldname": "statistics_section", | |||||
"fieldtype": "Section Break", | |||||
"label": "Statistics" | |||||
}, | |||||
{ | |||||
"fieldname": "column_break_4", | |||||
"fieldtype": "Column Break" | |||||
}, | |||||
{ | |||||
"fieldname": "column_break_12", | |||||
"fieldtype": "Column Break" | |||||
} | |||||
], | |||||
"in_create": 1, | |||||
"is_virtual": 1, | |||||
"links": [], | |||||
"modified": "2022-09-11 05:02:53.981705", | |||||
"modified_by": "Administrator", | |||||
"module": "Core", | |||||
"name": "RQ Worker", | |||||
"owner": "Administrator", | |||||
"permissions": [ | |||||
{ | |||||
"email": 1, | |||||
"export": 1, | |||||
"print": 1, | |||||
"read": 1, | |||||
"report": 1, | |||||
"role": "System Manager", | |||||
"share": 1 | |||||
} | |||||
], | |||||
"sort_field": "modified", | |||||
"sort_order": "DESC", | |||||
"states": [ | |||||
{ | |||||
"color": "Blue", | |||||
"title": "idle" | |||||
}, | |||||
{ | |||||
"color": "Yellow", | |||||
"title": "busy" | |||||
} | |||||
] | |||||
} |
@@ -0,0 +1,67 @@ | |||||
# Copyright (c) 2022, Frappe Technologies and contributors | |||||
# For license information, please see license.txt | |||||
from rq import Worker | |||||
import frappe | |||||
from frappe.model.document import Document | |||||
from frappe.utils import cint, convert_utc_to_user_timezone | |||||
from frappe.utils.background_jobs import get_workers | |||||
class RQWorker(Document): | |||||
def load_from_db(self): | |||||
all_workers = get_workers() | |||||
worker = [w for w in all_workers if w.pid == cint(self.name)][0] | |||||
d = serialize_worker(worker) | |||||
super(Document, self).__init__(d) | |||||
@staticmethod | |||||
def get_list(args): | |||||
start = cint(args.get("start")) or 0 | |||||
page_length = cint(args.get("page_length")) or 20 | |||||
workers = get_workers()[start : start + page_length] | |||||
return [serialize_worker(worker) for worker in workers] | |||||
@staticmethod | |||||
def get_count(args) -> int: | |||||
return len(get_workers()) | |||||
# None of these methods apply to virtual workers, overriden for sanity. | |||||
@staticmethod | |||||
def get_stats(args): | |||||
return {} | |||||
def db_insert(self, *args, **kwargs): | |||||
pass | |||||
def db_update(self, *args, **kwargs): | |||||
pass | |||||
def delete(self): | |||||
pass | |||||
def serialize_worker(worker: Worker) -> frappe._dict: | |||||
queue = ", ".join(worker.queue_names()) | |||||
return frappe._dict( | |||||
name=worker.pid, | |||||
queue=queue, | |||||
queue_type=queue.rsplit(":", 1)[1], | |||||
worker_name=worker.name, | |||||
status=worker.get_state(), | |||||
pid=worker.pid, | |||||
current_job_id=worker.get_current_job_id(), | |||||
last_heartbeat=convert_utc_to_user_timezone(worker.last_heartbeat), | |||||
birth_date=convert_utc_to_user_timezone(worker.birth_date), | |||||
successful_job_count=worker.successful_job_count, | |||||
failed_job_count=worker.failed_job_count, | |||||
total_working_time=worker.total_working_time, | |||||
_comment_count=0, | |||||
modified=convert_utc_to_user_timezone(worker.last_heartbeat), | |||||
creation=convert_utc_to_user_timezone(worker.birth_date), | |||||
) |
@@ -0,0 +1,17 @@ | |||||
# Copyright (c) 2022, Frappe Technologies and Contributors | |||||
# See license.txt | |||||
import frappe | |||||
from frappe.core.doctype.rq_worker.rq_worker import RQWorker | |||||
from frappe.tests.utils import FrappeTestCase | |||||
class TestRQWorker(FrappeTestCase): | |||||
def test_get_worker_list(self): | |||||
workers = RQWorker.get_list({}) | |||||
self.assertGreaterEqual(len(workers), 1) | |||||
self.assertTrue(any(w.queue_type == "short" for w in workers)) | |||||
def test_worker_serialization(self): | |||||
workers = RQWorker.get_list({}) | |||||
frappe.get_doc("RQ Worker", workers[0].pid) |
@@ -1300,12 +1300,23 @@ class Database: | |||||
def enqueue_jobs_after_commit(): | def enqueue_jobs_after_commit(): | ||||
from frappe.utils.background_jobs import execute_job, get_queue | |||||
from frappe.utils.background_jobs import ( | |||||
RQ_JOB_FAILURE_TTL, | |||||
RQ_RESULTS_TTL, | |||||
execute_job, | |||||
get_queue, | |||||
) | |||||
if frappe.flags.enqueue_after_commit and len(frappe.flags.enqueue_after_commit) > 0: | if frappe.flags.enqueue_after_commit and len(frappe.flags.enqueue_after_commit) > 0: | ||||
for job in frappe.flags.enqueue_after_commit: | for job in frappe.flags.enqueue_after_commit: | ||||
q = get_queue(job.get("queue"), is_async=job.get("is_async")) | q = get_queue(job.get("queue"), is_async=job.get("is_async")) | ||||
q.enqueue_call(execute_job, timeout=job.get("timeout"), kwargs=job.get("queue_args")) | |||||
q.enqueue_call( | |||||
execute_job, | |||||
timeout=job.get("timeout"), | |||||
kwargs=job.get("queue_args"), | |||||
failure_ttl=RQ_JOB_FAILURE_TTL, | |||||
result_ttl=RQ_RESULTS_TTL, | |||||
) | |||||
frappe.flags.enqueue_after_commit = [] | frappe.flags.enqueue_after_commit = [] | ||||
@@ -147,17 +147,6 @@ frappe.Application = class Application { | |||||
this.link_preview = new frappe.ui.LinkPreview(); | this.link_preview = new frappe.ui.LinkPreview(); | ||||
if (!frappe.boot.developer_mode) { | if (!frappe.boot.developer_mode) { | ||||
setInterval(function () { | |||||
frappe.call({ | |||||
method: "frappe.core.page.background_jobs.background_jobs.get_scheduler_status", | |||||
callback: function (r) { | |||||
if (r.message[0] == __("Inactive")) { | |||||
frappe.call("frappe.utils.scheduler.activate_scheduler"); | |||||
} | |||||
}, | |||||
}); | |||||
}, 300000); // check every 5 minutes | |||||
if (frappe.user.has_role("System Manager")) { | if (frappe.user.has_role("System Manager")) { | ||||
setInterval(function () { | setInterval(function () { | ||||
frappe.call({ | frappe.call({ | ||||
@@ -9,6 +9,7 @@ from uuid import uuid4 | |||||
import redis | import redis | ||||
from redis.exceptions import BusyLoadingError, ConnectionError | from redis.exceptions import BusyLoadingError, ConnectionError | ||||
from rq import Connection, Queue, Worker | from rq import Connection, Queue, Worker | ||||
from rq.command import send_stop_job_command | |||||
from rq.logutils import setup_loghandlers | from rq.logutils import setup_loghandlers | ||||
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed | from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed | ||||
@@ -23,6 +24,11 @@ if TYPE_CHECKING: | |||||
from rq.job import Job | from rq.job import Job | ||||
# TTL to keep RQ job logs in redis for. | |||||
RQ_JOB_FAILURE_TTL = 7 * 24 * 60 * 60 # 7 days instead of 1 year (default) | |||||
RQ_RESULTS_TTL = 10 * 60 | |||||
@lru_cache | @lru_cache | ||||
def get_queues_timeout(): | def get_queues_timeout(): | ||||
common_site_config = frappe.get_conf() | common_site_config = frappe.get_conf() | ||||
@@ -103,7 +109,14 @@ def enqueue( | |||||
) | ) | ||||
return frappe.flags.enqueue_after_commit | return frappe.flags.enqueue_after_commit | ||||
return q.enqueue_call(execute_job, timeout=timeout, kwargs=queue_args, at_front=at_front) | |||||
return q.enqueue_call( | |||||
execute_job, | |||||
timeout=timeout, | |||||
kwargs=queue_args, | |||||
at_front=at_front, | |||||
failure_ttl=RQ_JOB_FAILURE_TTL, | |||||
result_ttl=RQ_RESULTS_TTL, | |||||
) | |||||
def enqueue_doc( | def enqueue_doc( | ||||
@@ -182,6 +182,9 @@ def json_handler(obj): | |||||
elif type(obj) == type or isinstance(obj, Exception): | elif type(obj) == type or isinstance(obj, Exception): | ||||
return repr(obj) | return repr(obj) | ||||
elif callable(obj): | |||||
return repr(obj) | |||||
else: | else: | ||||
raise TypeError( | raise TypeError( | ||||
f"""Object of type {type(obj)} with value of {repr(obj)} is not JSON serializable""" | f"""Object of type {type(obj)} with value of {repr(obj)} is not JSON serializable""" | ||||
@@ -177,3 +177,10 @@ def activate_scheduler(): | |||||
enable_scheduler() | enable_scheduler() | ||||
if frappe.conf.pause_scheduler: | if frappe.conf.pause_scheduler: | ||||
update_site_config("pause_scheduler", 0) | update_site_config("pause_scheduler", 0) | ||||
@frappe.whitelist() | |||||
def get_scheduler_status(): | |||||
if is_scheduler_inactive(): | |||||
return {"status": "inactive"} | |||||
return {"status": "active"} |