diff --git a/frappe/core/doctype/doctype/doctype.py b/frappe/core/doctype/doctype/doctype.py index 9eabe0457d..1fa41da22a 100644 --- a/frappe/core/doctype/doctype/doctype.py +++ b/frappe/core/doctype/doctype/doctype.py @@ -1198,6 +1198,9 @@ def validate_fields(meta): frappe.throw(_("Precision should be between 1 and 6")) def check_unique_and_text(docname, d): + if meta.is_virtual: + return + if meta.issingle: d.unique = 0 d.search_index = 0 diff --git a/frappe/core/doctype/rq_job/__init__.py b/frappe/core/doctype/rq_job/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/frappe/core/doctype/rq_job/rq_job.js b/frappe/core/doctype/rq_job/rq_job.js new file mode 100644 index 0000000000..3f7a1a15b7 --- /dev/null +++ b/frappe/core/doctype/rq_job/rq_job.js @@ -0,0 +1,30 @@ +// Copyright (c) 2022, Frappe Technologies and contributors +// For license information, please see license.txt + +frappe.ui.form.on("RQ Job", { + refresh: function (frm) { + // Nothing in this form is supposed to be editable. + frm.disable_form(); + frm.dashboard.set_headline_alert( + "This is a virtual doctype and data is cleared periodically." + ); + + if (["started", "queued"].includes(frm.doc.status)) { + frm.add_custom_button(__("Force Stop job"), () => { + frappe.confirm( + "This will terminate the job immediately and might be dangerous, are you sure? ", + () => { + frappe + .xcall("frappe.core.doctype.rq_job.rq_job.stop_job", { + job_id: frm.doc.name, + }) + .then((r) => { + frappe.show_alert("Job Stopped Succefully"); + frm.reload_doc(); + }); + } + ); + }); + } + }, +}); diff --git a/frappe/core/doctype/rq_job/rq_job.json b/frappe/core/doctype/rq_job/rq_job.json new file mode 100644 index 0000000000..7cae15cf59 --- /dev/null +++ b/frappe/core/doctype/rq_job/rq_job.json @@ -0,0 +1,162 @@ +{ + "actions": [], + "allow_copy": 1, + "autoname": "field:job_id", + "creation": "2022-09-10 16:19:37.934903", + "doctype": "DocType", + "editable_grid": 1, + "engine": "InnoDB", + "field_order": [ + "job_info_section", + "job_id", + "job_name", + "queue", + "timeout", + "column_break_5", + "arguments", + "job_status_section", + "status", + "time_taken", + "column_break_11", + "started_at", + "ended_at", + "exception_section", + "exc_info" + ], + "fields": [ + { + "fieldname": "queue", + "fieldtype": "Select", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Queue", + "options": "default\nshort\nlong" + }, + { + "fieldname": "status", + "fieldtype": "Select", + "in_list_view": 1, + "in_standard_filter": 1, + "label": "Status", + "options": "queued\nstarted\nfinished\nfailed\ndeferred\nscheduled\ncanceled" + }, + { + "fieldname": "job_id", + "fieldtype": "Data", + "label": "Job ID", + "unique": 1 + }, + { + "fieldname": "exc_info", + "fieldtype": "Code", + "label": "Exception" + }, + { + "fieldname": "job_name", + "fieldtype": "Data", + "label": "Job Name" + }, + { + "fieldname": "arguments", + "fieldtype": "Code", + "label": "Arguments" + }, + { + "fieldname": "timeout", + "fieldtype": "Duration", + "label": "Timeout" + }, + { + "fieldname": "time_taken", + "fieldtype": "Duration", + "label": "Time Taken" + }, + { + "fieldname": "started_at", + "fieldtype": "Datetime", + "label": "Started At" + }, + { + "fieldname": "ended_at", + "fieldtype": "Datetime", + "label": "Ended At" + }, + { + "fieldname": "job_info_section", + "fieldtype": "Section Break", + "label": "Job Info" + }, + { + "fieldname": "job_status_section", + "fieldtype": "Section Break", + "label": "Job Status" + }, + { + "fieldname": "column_break_5", + "fieldtype": "Column Break" + }, + { + "fieldname": "column_break_11", + "fieldtype": "Column Break" + }, + { + "fieldname": "exception_section", + "fieldtype": "Section Break" + } + ], + "in_create": 1, + "is_virtual": 1, + "links": [], + "modified": "2022-09-11 05:27:50.878534", + "modified_by": "Administrator", + "module": "Core", + "name": "RQ Job", + "naming_rule": "By fieldname", + "owner": "Administrator", + "permissions": [ + { + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1 + }, + { + "delete": 1, + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "Administrator", + "share": 1 + } + ], + "sort_field": "modified", + "sort_order": "DESC", + "states": [ + { + "color": "Yellow", + "title": "queued" + }, + { + "color": "Blue", + "title": "started" + }, + { + "color": "Red", + "title": "failed" + }, + { + "color": "Green", + "title": "finished" + }, + { + "color": "Orange", + "title": "cancelled" + } + ], + "title_field": "job_name" +} \ No newline at end of file diff --git a/frappe/core/doctype/rq_job/rq_job.py b/frappe/core/doctype/rq_job/rq_job.py new file mode 100644 index 0000000000..bceb93c0a4 --- /dev/null +++ b/frappe/core/doctype/rq_job/rq_job.py @@ -0,0 +1,186 @@ +# Copyright (c) 2022, Frappe Technologies and contributors +# For license information, please see license.txt + +import functools + +from rq.command import send_stop_job_command +from rq.job import Job +from rq.queue import Queue + +import frappe +from frappe.model.document import Document +from frappe.utils import ( + cint, + compare, + convert_utc_to_user_timezone, + create_batch, + make_filter_dict, +) +from frappe.utils.background_jobs import get_queues, get_redis_conn + +QUEUES = ["default", "long", "short"] +JOB_STATUSES = ["queued", "started", "failed", "finished", "deferred", "scheduled", "canceled"] + + +def check_permissions(method): + @functools.wraps(method) + def wrapper(*args, **kwargs): + frappe.only_for("System Manager") + job = args[0].job + if not for_current_site(job): + raise frappe.PermissionError + + return method(*args, **kwargs) + + return wrapper + + +class RQJob(Document): + def load_from_db(self): + job = Job.fetch(self.name, connection=get_redis_conn()) + if not for_current_site(job): + raise frappe.PermissionError + super(Document, self).__init__(serialize_job(job)) + self._job_obj = job + + @property + def job(self): + return self._job_obj + + @staticmethod + def get_list(args): + + start = cint(args.get("start")) or 0 + page_length = cint(args.get("page_length")) or 20 + + order_desc = "desc" in args.get("order_by", "") + + matched_job_ids = RQJob.get_matching_job_ids(args) + + jobs = [] + for job_ids in create_batch(matched_job_ids, 100): + jobs.extend( + serialize_job(job) + for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()) + if job and for_current_site(job) + ) + if len(jobs) > start + page_length: + # we have fetched enough. This is inefficient but because of site filtering TINA + break + + return sorted(jobs, key=lambda j: j.modified, reverse=order_desc)[start : start + page_length] + + @staticmethod + def get_matching_job_ids(args): + filters = make_filter_dict(args.get("filters")) + + queues = _eval_filters(filters.get("queue"), QUEUES) + statuses = _eval_filters(filters.get("status"), JOB_STATUSES) + + matched_job_ids = [] + for queue in get_queues(): + if not queue.name.endswith(tuple(queues)): + continue + for status in statuses: + matched_job_ids.extend(fetch_job_ids(queue, status)) + + return matched_job_ids + + @check_permissions + def delete(self): + self.job.delete() + + @check_permissions + def stop_job(self): + send_stop_job_command(connection=get_redis_conn(), job_id=self.job_id) + + @staticmethod + def get_count(args) -> int: + # Can not be implemented efficiently due to site filtering hence ignored. + return 0 + + # None of these methods apply to virtual job doctype, overriden for sanity. + @staticmethod + def get_stats(args): + return {} + + def db_insert(self, *args, **kwargs): + pass + + def db_update(self, *args, **kwargs): + pass + + +def serialize_job(job: Job) -> frappe._dict: + modified = job.last_heartbeat or job.ended_at or job.started_at or job.created_at + + return frappe._dict( + name=job.id, + job_id=job.id, + queue=job.origin.rsplit(":", 1)[1], + job_name=job.kwargs.get("kwargs", {}).get("job_type") or str(job.kwargs.get("job_name")), + status=job.get_status(), + started_at=convert_utc_to_user_timezone(job.started_at) if job.started_at else "", + ended_at=convert_utc_to_user_timezone(job.ended_at) if job.ended_at else "", + time_taken=(job.ended_at - job.started_at).total_seconds() if job.ended_at else "", + exc_info=job.exc_info, + arguments=frappe.as_json(job.kwargs), + timeout=job.timeout, + creation=convert_utc_to_user_timezone(job.created_at), + modified=convert_utc_to_user_timezone(modified), + _comment_count=0, + ) + + +def for_current_site(job: Job) -> bool: + return job.kwargs.get("site") == frappe.local.site + + +def _eval_filters(filter, values: list[str]) -> list[str]: + if filter: + operator, operand = filter + return [val for val in values if compare(val, operator, operand)] + return values + + +def fetch_job_ids(queue: Queue, status: str) -> list[str]: + registry_map = { + "queued": queue, # self + "started": queue.started_job_registry, + "finished": queue.finished_job_registry, + "failed": queue.failed_job_registry, + "deferred": queue.deferred_job_registry, + "scheduled": queue.scheduled_job_registry, + "canceled": queue.canceled_job_registry, + } + + registry = registry_map.get(status) + if registry is not None: + job_ids = registry.get_job_ids() + return [j for j in job_ids if j] + + return [] + + +@frappe.whitelist() +def remove_failed_jobs(): + frappe.only_for("System Manager") + for queue in get_queues(): + fail_registry = queue.failed_job_registry + for job_ids in create_batch(fail_registry.get_job_ids(), 100): + for job in Job.fetch_many(job_ids=job_ids, connection=get_redis_conn()): + if job and for_current_site(job): + fail_registry.remove(job, delete_job=True) + + +def get_all_queued_jobs(): + jobs = [] + for q in get_queues(): + jobs.extend(q.get_jobs()) + + return [job for job in jobs if for_current_site(job)] + + +@frappe.whitelist() +def stop_job(job_id): + frappe.get_doc("RQ Job", job_id).stop_job() diff --git a/frappe/core/doctype/rq_job/rq_job_list.js b/frappe/core/doctype/rq_job/rq_job_list.js new file mode 100644 index 0000000000..5f6646cd65 --- /dev/null +++ b/frappe/core/doctype/rq_job/rq_job_list.js @@ -0,0 +1,32 @@ +frappe.listview_settings["RQ Job"] = { + hide_name_column: true, + + onload(listview) { + if (!has_common(frappe.user_roles, ["Administrator", "System Manager"])) return; + + listview.page.add_inner_button(__("Remove Failed Jobs"), () => { + frappe.confirm(__("Are you sure you want to remove all failed jobs?"), () => { + frappe.xcall("frappe.core.doctype.rq_job.rq_job.remove_failed_jobs"); + }); + }); + + if (listview.list_view_settings) { + listview.list_view_settings.disable_count = 1; + listview.list_view_settings.disable_sidebar_stats = 1; + } + + frappe.xcall("frappe.utils.scheduler.get_scheduler_status").then(({ status }) => { + if (status === "active") { + listview.page.set_indicator(__("Scheduler: Active"), "green"); + } else { + listview.page.set_indicator(__("Scheduler: Inactive"), "red"); + } + }); + + setInterval(() => { + if (!listview.list_view_settings.disable_auto_refresh) { + listview.refresh(); + } + }, 5000); + }, +}; diff --git a/frappe/core/doctype/rq_job/test_rq_job.py b/frappe/core/doctype/rq_job/test_rq_job.py new file mode 100644 index 0000000000..4a51f54c0d --- /dev/null +++ b/frappe/core/doctype/rq_job/test_rq_job.py @@ -0,0 +1,88 @@ +# Copyright (c) 2022, Frappe Technologies and Contributors + +# See license.txt + +import time + +from rq import exceptions as rq_exc +from rq.job import Job + +import frappe +from frappe.core.doctype.rq_job.rq_job import RQJob, remove_failed_jobs, stop_job +from frappe.tests.utils import FrappeTestCase, timeout + + +class TestRQJob(FrappeTestCase): + + BG_JOB = "frappe.core.doctype.rq_job.test_rq_job.test_func" + + @timeout(seconds=20) + def check_status(self, job: Job, status, wait=True): + if wait: + while True: + if job.is_queued or job.is_started: + time.sleep(0.2) + else: + break + self.assertEqual(frappe.get_doc("RQ Job", job.id).status, status) + + def test_serialization(self): + + job = frappe.enqueue(method=self.BG_JOB, queue="short") + + rq_job = frappe.get_doc("RQ Job", job.id) + + self.assertEqual(job, rq_job.job) + + self.assertDocumentEqual( + { + "name": job.id, + "queue": "short", + "job_name": self.BG_JOB, + "status": "queued", + "exc_info": None, + }, + rq_job, + ) + self.check_status(job, "finished") + + def test_get_list_filtering(self): + + # Check failed job clearning and filtering + remove_failed_jobs() + jobs = RQJob.get_list({"filters": [["RQ Job", "status", "=", "failed"]]}) + self.assertEqual(jobs, []) + + # Fail a job + job = frappe.enqueue(method=self.BG_JOB, queue="short", fail=True) + self.check_status(job, "failed") + jobs = RQJob.get_list({"filters": [["RQ Job", "status", "=", "failed"]]}) + self.assertEqual(len(jobs), 1) + self.assertTrue(jobs[0].exc_info) + + # Assert that non-failed job still exists + non_failed_jobs = RQJob.get_list({"filters": [["RQ Job", "status", "!=", "failed"]]}) + self.assertGreaterEqual(len(non_failed_jobs), 1) + + # Create a slow job and check if it's stuck in "Started" + job = frappe.enqueue(method=self.BG_JOB, queue="short", sleep=1000) + time.sleep(3) + self.check_status(job, "started", wait=False) + stop_job(job_id=job.id) + self.check_status(job, "stopped") + + def test_delete_doc(self): + job = frappe.enqueue(method=self.BG_JOB, queue="short") + frappe.get_doc("RQ Job", job.id).delete() + + with self.assertRaises(rq_exc.NoSuchJobError): + job.refresh() + + +def test_func(fail=False, sleep=0): + if fail: + 42 / 0 + if sleep: + time.sleep(sleep) + + return True diff --git a/frappe/core/doctype/rq_worker/__init__.py b/frappe/core/doctype/rq_worker/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/frappe/core/doctype/rq_worker/rq_worker.js b/frappe/core/doctype/rq_worker/rq_worker.js new file mode 100644 index 0000000000..622cb30cb9 --- /dev/null +++ b/frappe/core/doctype/rq_worker/rq_worker.js @@ -0,0 +1,9 @@ +// Copyright (c) 2022, Frappe Technologies and contributors +// For license information, please see license.txt + +frappe.ui.form.on("RQ Worker", { + refresh: function (frm) { + // Nothing in this form is supposed to be editable. + frm.disable_form(); + }, +}); diff --git a/frappe/core/doctype/rq_worker/rq_worker.json b/frappe/core/doctype/rq_worker/rq_worker.json new file mode 100644 index 0000000000..ea65abd482 --- /dev/null +++ b/frappe/core/doctype/rq_worker/rq_worker.json @@ -0,0 +1,138 @@ +{ + "actions": [], + "allow_copy": 1, + "creation": "2022-09-10 14:54:57.342170", + "doctype": "DocType", + "editable_grid": 1, + "engine": "InnoDB", + "field_order": [ + "worker_information_section", + "queue", + "queue_type", + "column_break_4", + "worker_name", + "statistics_section", + "status", + "pid", + "current_job_id", + "successful_job_count", + "failed_job_count", + "column_break_12", + "birth_date", + "last_heartbeat", + "total_working_time" + ], + "fields": [ + { + "fieldname": "worker_name", + "fieldtype": "Data", + "label": "Worker Name", + "unique": 1 + }, + { + "fieldname": "status", + "fieldtype": "Data", + "in_list_view": 1, + "label": "Status" + }, + { + "fieldname": "current_job_id", + "fieldtype": "Link", + "label": "Current Job ID", + "options": "RQ Job" + }, + { + "fieldname": "pid", + "fieldtype": "Data", + "label": "PID" + }, + { + "fieldname": "last_heartbeat", + "fieldtype": "Datetime", + "label": "Last Heartbeat" + }, + { + "fieldname": "birth_date", + "fieldtype": "Datetime", + "label": "Start Time" + }, + { + "fieldname": "successful_job_count", + "fieldtype": "Int", + "in_list_view": 1, + "label": "Successful Job Count" + }, + { + "fieldname": "failed_job_count", + "fieldtype": "Int", + "in_list_view": 1, + "label": "Failed Job Count" + }, + { + "fieldname": "total_working_time", + "fieldtype": "Duration", + "label": "Total Working Time" + }, + { + "fieldname": "queue", + "fieldtype": "Data", + "label": "Queue" + }, + { + "fieldname": "queue_type", + "fieldtype": "Select", + "in_list_view": 1, + "label": "Queue Type", + "options": "default\nlong\nshort" + }, + { + "fieldname": "worker_information_section", + "fieldtype": "Section Break", + "label": "Worker Information" + }, + { + "fieldname": "statistics_section", + "fieldtype": "Section Break", + "label": "Statistics" + }, + { + "fieldname": "column_break_4", + "fieldtype": "Column Break" + }, + { + "fieldname": "column_break_12", + "fieldtype": "Column Break" + } + ], + "in_create": 1, + "is_virtual": 1, + "links": [], + "modified": "2022-09-11 05:02:53.981705", + "modified_by": "Administrator", + "module": "Core", + "name": "RQ Worker", + "owner": "Administrator", + "permissions": [ + { + "email": 1, + "export": 1, + "print": 1, + "read": 1, + "report": 1, + "role": "System Manager", + "share": 1 + } + ], + "sort_field": "modified", + "sort_order": "DESC", + "states": [ + { + "color": "Blue", + "title": "idle" + }, + { + "color": "Yellow", + "title": "busy" + } + ] +} \ No newline at end of file diff --git a/frappe/core/doctype/rq_worker/rq_worker.py b/frappe/core/doctype/rq_worker/rq_worker.py new file mode 100644 index 0000000000..bc1326f522 --- /dev/null +++ b/frappe/core/doctype/rq_worker/rq_worker.py @@ -0,0 +1,67 @@ +# Copyright (c) 2022, Frappe Technologies and contributors +# For license information, please see license.txt + +from rq import Worker + +import frappe +from frappe.model.document import Document +from frappe.utils import cint, convert_utc_to_user_timezone +from frappe.utils.background_jobs import get_workers + + +class RQWorker(Document): + def load_from_db(self): + + all_workers = get_workers() + worker = [w for w in all_workers if w.pid == cint(self.name)][0] + d = serialize_worker(worker) + + super(Document, self).__init__(d) + + @staticmethod + def get_list(args): + start = cint(args.get("start")) or 0 + page_length = cint(args.get("page_length")) or 20 + + workers = get_workers()[start : start + page_length] + return [serialize_worker(worker) for worker in workers] + + @staticmethod + def get_count(args) -> int: + return len(get_workers()) + + # None of these methods apply to virtual workers, overriden for sanity. + @staticmethod + def get_stats(args): + return {} + + def db_insert(self, *args, **kwargs): + pass + + def db_update(self, *args, **kwargs): + pass + + def delete(self): + pass + + +def serialize_worker(worker: Worker) -> frappe._dict: + queue = ", ".join(worker.queue_names()) + + return frappe._dict( + name=worker.pid, + queue=queue, + queue_type=queue.rsplit(":", 1)[1], + worker_name=worker.name, + status=worker.get_state(), + pid=worker.pid, + current_job_id=worker.get_current_job_id(), + last_heartbeat=convert_utc_to_user_timezone(worker.last_heartbeat), + birth_date=convert_utc_to_user_timezone(worker.birth_date), + successful_job_count=worker.successful_job_count, + failed_job_count=worker.failed_job_count, + total_working_time=worker.total_working_time, + _comment_count=0, + modified=convert_utc_to_user_timezone(worker.last_heartbeat), + creation=convert_utc_to_user_timezone(worker.birth_date), + ) diff --git a/frappe/core/doctype/rq_worker/test_rq_worker.py b/frappe/core/doctype/rq_worker/test_rq_worker.py new file mode 100644 index 0000000000..5a43270681 --- /dev/null +++ b/frappe/core/doctype/rq_worker/test_rq_worker.py @@ -0,0 +1,17 @@ +# Copyright (c) 2022, Frappe Technologies and Contributors +# See license.txt + +import frappe +from frappe.core.doctype.rq_worker.rq_worker import RQWorker +from frappe.tests.utils import FrappeTestCase + + +class TestRQWorker(FrappeTestCase): + def test_get_worker_list(self): + workers = RQWorker.get_list({}) + self.assertGreaterEqual(len(workers), 1) + self.assertTrue(any(w.queue_type == "short" for w in workers)) + + def test_worker_serialization(self): + workers = RQWorker.get_list({}) + frappe.get_doc("RQ Worker", workers[0].pid) diff --git a/frappe/database/database.py b/frappe/database/database.py index 07f8162ef7..b1cda25656 100644 --- a/frappe/database/database.py +++ b/frappe/database/database.py @@ -1300,12 +1300,23 @@ class Database: def enqueue_jobs_after_commit(): - from frappe.utils.background_jobs import execute_job, get_queue + from frappe.utils.background_jobs import ( + RQ_JOB_FAILURE_TTL, + RQ_RESULTS_TTL, + execute_job, + get_queue, + ) if frappe.flags.enqueue_after_commit and len(frappe.flags.enqueue_after_commit) > 0: for job in frappe.flags.enqueue_after_commit: q = get_queue(job.get("queue"), is_async=job.get("is_async")) - q.enqueue_call(execute_job, timeout=job.get("timeout"), kwargs=job.get("queue_args")) + q.enqueue_call( + execute_job, + timeout=job.get("timeout"), + kwargs=job.get("queue_args"), + failure_ttl=RQ_JOB_FAILURE_TTL, + result_ttl=RQ_RESULTS_TTL, + ) frappe.flags.enqueue_after_commit = [] diff --git a/frappe/public/js/frappe/desk.js b/frappe/public/js/frappe/desk.js index d942c3b849..c8d3d6d0d6 100644 --- a/frappe/public/js/frappe/desk.js +++ b/frappe/public/js/frappe/desk.js @@ -147,17 +147,6 @@ frappe.Application = class Application { this.link_preview = new frappe.ui.LinkPreview(); if (!frappe.boot.developer_mode) { - setInterval(function () { - frappe.call({ - method: "frappe.core.page.background_jobs.background_jobs.get_scheduler_status", - callback: function (r) { - if (r.message[0] == __("Inactive")) { - frappe.call("frappe.utils.scheduler.activate_scheduler"); - } - }, - }); - }, 300000); // check every 5 minutes - if (frappe.user.has_role("System Manager")) { setInterval(function () { frappe.call({ diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 3d3df3504d..9fb76beb17 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -9,6 +9,7 @@ from uuid import uuid4 import redis from redis.exceptions import BusyLoadingError, ConnectionError from rq import Connection, Queue, Worker +from rq.command import send_stop_job_command from rq.logutils import setup_loghandlers from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed @@ -23,6 +24,11 @@ if TYPE_CHECKING: from rq.job import Job +# TTL to keep RQ job logs in redis for. +RQ_JOB_FAILURE_TTL = 7 * 24 * 60 * 60 # 7 days instead of 1 year (default) +RQ_RESULTS_TTL = 10 * 60 + + @lru_cache def get_queues_timeout(): common_site_config = frappe.get_conf() @@ -103,7 +109,14 @@ def enqueue( ) return frappe.flags.enqueue_after_commit - return q.enqueue_call(execute_job, timeout=timeout, kwargs=queue_args, at_front=at_front) + return q.enqueue_call( + execute_job, + timeout=timeout, + kwargs=queue_args, + at_front=at_front, + failure_ttl=RQ_JOB_FAILURE_TTL, + result_ttl=RQ_RESULTS_TTL, + ) def enqueue_doc( diff --git a/frappe/utils/response.py b/frappe/utils/response.py index ed2cee6208..7b04b702e9 100644 --- a/frappe/utils/response.py +++ b/frappe/utils/response.py @@ -182,6 +182,9 @@ def json_handler(obj): elif type(obj) == type or isinstance(obj, Exception): return repr(obj) + elif callable(obj): + return repr(obj) + else: raise TypeError( f"""Object of type {type(obj)} with value of {repr(obj)} is not JSON serializable""" diff --git a/frappe/utils/scheduler.py b/frappe/utils/scheduler.py index 23b4949c38..d7393c4d45 100755 --- a/frappe/utils/scheduler.py +++ b/frappe/utils/scheduler.py @@ -177,3 +177,10 @@ def activate_scheduler(): enable_scheduler() if frappe.conf.pause_scheduler: update_site_config("pause_scheduler", 0) + + +@frappe.whitelist() +def get_scheduler_status(): + if is_scheduler_inactive(): + return {"status": "inactive"} + return {"status": "active"}