diff --git a/frappe/deferred_insert.py b/frappe/deferred_insert.py index 48f8ddae26..28c77002f8 100644 --- a/frappe/deferred_insert.py +++ b/frappe/deferred_insert.py @@ -1,13 +1,28 @@ import json +from typing import TYPE_CHECKING, Dict, List, Union + +import redis import frappe from frappe.utils import cstr +if TYPE_CHECKING: + from frappe.model.document import Document + queue_prefix = "insert_queue_for_" -def deferred_insert(doctype, records): - frappe.cache().rpush(queue_prefix + doctype, records) +def deferred_insert(doctype: str, records: Union[List[Union[Dict, "Document"]], str]): + if isinstance(records, (dict, list)): + _records = json.dumps(records) + else: + _records = records + + try: + frappe.cache().rpush(f"{queue_prefix}{doctype}", _records) + except redis.exceptions.ConnectionError: + for record in records: + insert_record(record, doctype) def save_to_db(): @@ -30,19 +45,17 @@ def save_to_db(): frappe.db.commit() -def insert_record(record, doctype): - if not record.get("doctype"): - record["doctype"] = doctype +def insert_record(record: Union[Dict, "Document"], doctype: str): + setattr(record, "doctype", doctype) try: - doc = frappe.get_doc(record) - doc.insert() + frappe.get_doc(record).insert() except Exception as e: - print(e, doctype) + frappe.logger().error(f"Error while inserting deferred {doctype} record: {e}") -def get_key_name(key): +def get_key_name(key: str) -> str: return cstr(key).split("|")[1] -def get_doctype_name(key): +def get_doctype_name(key: str) -> str: return cstr(key).split(queue_prefix)[1] diff --git a/frappe/desk/doctype/route_history/route_history.py b/frappe/desk/doctype/route_history/route_history.py index cea1ed79e3..e712a5bb11 100644 --- a/frappe/desk/doctype/route_history/route_history.py +++ b/frappe/desk/doctype/route_history/route_history.py @@ -1,11 +1,11 @@ -# Copyright (c) 2021, Frappe Technologies and contributors +# Copyright (c) 2022, Frappe Technologies and contributors # License: MIT. See LICENSE -import json - import frappe from frappe.deferred_insert import deferred_insert as _deferred_insert from frappe.model.document import Document +from frappe.query_builder import DocType +from frappe.query_builder.functions import Count class RouteHistory(Document): @@ -14,31 +14,29 @@ class RouteHistory(Document): def flush_old_route_records(): """Deletes all route records except last 500 records per user""" - records_to_keep_limit = 500 - users = frappe.db.sql( - """ - SELECT `user` - FROM `tabRoute History` - GROUP BY `user` - HAVING count(`name`) > %(limit)s - """, - {"limit": records_to_keep_limit}, - ) + RouteHistory = DocType("Route History") + + users = ( + frappe.qb.from_(RouteHistory) + .select(RouteHistory.user) + .groupby(RouteHistory.user) + .having(Count(RouteHistory.name) > records_to_keep_limit) + ).run(pluck=True) for user in users: - user = user[0] - last_record_to_keep = frappe.db.get_all( + last_record_to_keep = frappe.get_all( "Route History", filters={"user": user}, - limit=1, limit_start=500, fields=["modified"], order_by="modified desc", + limit=1, ) frappe.db.delete( - "Route History", {"modified": ("<=", last_record_to_keep[0].modified), "user": user} + "Route History", + {"modified": ("<=", last_record_to_keep[0].modified), "user": user}, ) @@ -53,7 +51,7 @@ def deferred_insert(routes): for route in frappe.parse_json(routes) ] - _deferred_insert("Route History", json.dumps(routes)) + _deferred_insert("Route History", routes) @frappe.whitelist()