diff --git a/frappe/core/doctype/doctype/test_doctype.py b/frappe/core/doctype/doctype/test_doctype.py index 135172e8da..031cbb553f 100644 --- a/frappe/core/doctype/doctype/test_doctype.py +++ b/frappe/core/doctype/doctype/test_doctype.py @@ -2,6 +2,7 @@ # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE import unittest +from typing import Dict, List, Optional import frappe from frappe.core.doctype.doctype.doctype import ( @@ -524,7 +525,7 @@ class TestDocType(unittest.TestCase): def test_autoincremented_doctype_transition(self): frappe.delete_doc("testy_autoinc_dt") - dt = new_doctype("testy_autoinc_dt", autoincremented=True).insert(ignore_permissions=True) + dt = new_doctype("testy_autoinc_dt", autoname="autoincrement").insert(ignore_permissions=True) dt.autoname = "hash" try: @@ -538,7 +539,9 @@ class TestDocType(unittest.TestCase): dt.delete(ignore_permissions=True) -def new_doctype(name, unique=0, depends_on="", fields=None, autoincremented=False): +def new_doctype( + name, unique: bool = False, depends_on: str = "", fields: Optional[List[Dict]] = None, **kwargs +): doc = frappe.get_doc( { "doctype": "DocType", @@ -560,7 +563,7 @@ def new_doctype(name, unique=0, depends_on="", fields=None, autoincremented=Fals } ], "name": name, - "autoname": "autoincrement" if autoincremented else "", + **kwargs, } ) diff --git a/frappe/model/document.py b/frappe/model/document.py index ef2aa9a6dc..07ea58d8e9 100644 --- a/frappe/model/document.py +++ b/frappe/model/document.py @@ -3,6 +3,7 @@ import hashlib import json import time +from typing import List from werkzeug.exceptions import NotFound @@ -20,9 +21,6 @@ from frappe.utils import cstr, date_diff, file_lock, flt, get_datetime_str, now from frappe.utils.data import get_absolute_url from frappe.utils.global_search import update_global_search -# once_only validation -# methods - def get_doc(*args, **kwargs): """returns a frappe.model.Document object. @@ -188,7 +186,7 @@ class Document(BaseDocument): if not self.has_permission(permtype): self.raise_no_permission_to(permlevel or permtype) - def has_permission(self, permtype="read", verbose=False): + def has_permission(self, permtype="read", verbose=False) -> bool: """Call `frappe.has_permission` if `self.flags.ignore_permissions` is not set. @@ -212,7 +210,7 @@ class Document(BaseDocument): ignore_mandatory=None, set_name=None, set_child_names=True, - ): + ) -> "Document": """Insert the document in the database (as a new document). This will check for user permissions and execute `before_insert`, `validate`, `on_update`, `after_insert` methods if they are written. @@ -294,7 +292,7 @@ class Document(BaseDocument): """Wrapper for _save""" return self._save(*args, **kwargs) - def _save(self, ignore_permissions=None, ignore_version=None): + def _save(self, ignore_permissions=None, ignore_version=None) -> "Document": """Save the current document in the database in the **DocType**'s table or `tabSingles` (for single types). @@ -524,8 +522,7 @@ class Document(BaseDocument): self._save_passwords() self.validate_workflow() - children = self.get_all_children() - for d in children: + for d in self.get_all_children(): d._validate_data_fields() d._validate_selects() d._validate_non_negative() @@ -890,7 +887,7 @@ class Document(BaseDocument): msg = ", ".join((each[2] for each in cancelled_links)) frappe.throw(_("Cannot link cancelled document: {0}").format(msg), frappe.CancelledLinkError) - def get_all_children(self, parenttype=None): + def get_all_children(self, parenttype=None) -> List["Document"]: """Returns all children documents from **Table** type fields in a list.""" children = [] diff --git a/frappe/tests/test_db.py b/frappe/tests/test_db.py index 624f346716..f722ad1d65 100644 --- a/frappe/tests/test_db.py +++ b/frappe/tests/test_db.py @@ -759,7 +759,7 @@ class TestDDLCommandsPost(unittest.TestCase): def test_sequence_table_creation(self): from frappe.core.doctype.doctype.test_doctype import new_doctype - dt = new_doctype("autoinc_dt_seq_test", autoincremented=True).insert(ignore_permissions=True) + dt = new_doctype("autoinc_dt_seq_test", autoname="autoincrement").insert(ignore_permissions=True) if frappe.db.db_type == "postgres": self.assertTrue( diff --git a/frappe/tests/test_db_query.py b/frappe/tests/test_db_query.py index 90b047b3cd..8bdd66a045 100644 --- a/frappe/tests/test_db_query.py +++ b/frappe/tests/test_db_query.py @@ -619,7 +619,7 @@ class TestReportview(unittest.TestCase): def test_cast_name(self): from frappe.core.doctype.doctype.test_doctype import new_doctype - dt = new_doctype("autoinc_dt_test", autoincremented=True).insert(ignore_permissions=True) + dt = new_doctype("autoinc_dt_test", autoname="autoincrement").insert(ignore_permissions=True) query = DatabaseQuery("autoinc_dt_test").execute( fields=["locate('1', `tabautoinc_dt_test`.`name`)", "`tabautoinc_dt_test`.`name`"], diff --git a/frappe/tests/test_naming.py b/frappe/tests/test_naming.py index e57d2ae4cd..33974e5d27 100644 --- a/frappe/tests/test_naming.py +++ b/frappe/tests/test_naming.py @@ -262,7 +262,7 @@ class TestNaming(unittest.TestCase): from frappe.core.doctype.doctype.test_doctype import new_doctype doctype = "autoinc_doctype" + frappe.generate_hash(length=5) - dt = new_doctype(doctype, autoincremented=True).insert(ignore_permissions=True) + dt = new_doctype(doctype, autoname="autoincrement").insert(ignore_permissions=True) for i in range(1, 20): self.assertEqual(frappe.new_doc(doctype).save(ignore_permissions=True).name, i) diff --git a/frappe/tests/test_nestedset.py b/frappe/tests/test_nestedset.py new file mode 100644 index 0000000000..2d601ec185 --- /dev/null +++ b/frappe/tests/test_nestedset.py @@ -0,0 +1,235 @@ +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors +# License: MIT. See LICENSE + +import frappe +from frappe.core.doctype.doctype.test_doctype import new_doctype +from frappe.query_builder import Field +from frappe.query_builder.functions import Max +from frappe.tests.utils import FrappeTestCase +from frappe.utils.nestedset import ( + NestedSetChildExistsError, + NestedSetInvalidMergeError, + NestedSetRecursionError, + get_descendants_of, + rebuild_tree, +) + +records = [ + { + "some_fieldname": "Root Node", + "parent_test_tree_doctype": None, + "is_group": 1, + }, + { + "some_fieldname": "Parent 1", + "parent_test_tree_doctype": "Root Node", + "is_group": 1, + }, + { + "some_fieldname": "Parent 2", + "parent_test_tree_doctype": "Root Node", + "is_group": 1, + }, + { + "some_fieldname": "Child 1", + "parent_test_tree_doctype": "Parent 1", + "is_group": 0, + }, + { + "some_fieldname": "Child 2", + "parent_test_tree_doctype": "Parent 1", + "is_group": 0, + }, + { + "some_fieldname": "Child 3", + "parent_test_tree_doctype": "Parent 2", + "is_group": 0, + }, +] + + +class NestedSetTestUtil: + def setup_test_doctype(self): + frappe.db.sql("delete from `tabDocType` where `name` = 'Test Tree DocType'") + frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`") + + self.tree_doctype = new_doctype( + "Test Tree DocType", is_tree=True, autoname="field:some_fieldname" + ) + self.tree_doctype.insert() + + for record in records: + d = frappe.new_doc("Test Tree DocType") + d.update(record) + d.insert() + + def teardown_test_doctype(self): + self.tree_doctype.delete() + frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`") + + def move_it_back(self): + parent_1 = frappe.get_doc("Test Tree DocType", "Parent 1") + parent_1.parent_test_tree_doctype = "Root Node" + parent_1.save() + + def get_no_of_children(self, record_name: str) -> int: + if not record_name: + return frappe.db.count("Test Tree DocType") + return len(get_descendants_of("Test Tree DocType", record_name, ignore_permissions=True)) + + +class TestNestedSet(FrappeTestCase): + @classmethod + def setUpClass(cls) -> None: + cls.nsu = NestedSetTestUtil() + cls.nsu.setup_test_doctype() + super().setUpClass() + + @classmethod + def tearDownClass(cls) -> None: + cls.nsu.teardown_test_doctype() + super().tearDownClass() + + def setUp(self) -> None: + frappe.db.rollback() + + def test_basic_tree(self): + global records + + min_lft = 1 + max_rgt = frappe.qb.from_("Test Tree DocType").select(Max(Field("rgt"))).run(pluck=True)[0] + + for record in records: + lft, rgt, parent_test_tree_doctype = frappe.db.get_value( + "Test Tree DocType", + record["some_fieldname"], + ["lft", "rgt", "parent_test_tree_doctype"], + ) + + if parent_test_tree_doctype: + parent_lft, parent_rgt = frappe.db.get_value( + "Test Tree DocType", parent_test_tree_doctype, ["lft", "rgt"] + ) + else: + # root + parent_lft = min_lft - 1 + parent_rgt = max_rgt + 1 + + self.assertTrue(lft) + self.assertTrue(rgt) + self.assertTrue(lft < rgt) + self.assertTrue(parent_lft < parent_rgt) + self.assertTrue(lft > parent_lft) + self.assertTrue(rgt < parent_rgt) + self.assertTrue(lft >= min_lft) + self.assertTrue(rgt <= max_rgt) + + no_of_children = self.nsu.get_no_of_children(record["some_fieldname"]) + self.assertTrue( + rgt == (lft + 1 + (2 * no_of_children)), + msg=(record, no_of_children, self.nsu.get_no_of_children(record["some_fieldname"])), + ) + + no_of_children = self.nsu.get_no_of_children(parent_test_tree_doctype) + self.assertTrue(parent_rgt == (parent_lft + 1 + (2 * no_of_children))) + + def test_recursion(self): + leaf_node = frappe.get_doc("Test Tree DocType", {"some_fieldname": "Parent 2"}) + leaf_node.parent_test_tree_doctype = "Child 3" + self.assertRaises(NestedSetRecursionError, leaf_node.save) + leaf_node.reload() + + def test_rebuild_tree(self): + rebuild_tree("Test Tree DocType", "parent_test_tree_doctype") + self.test_basic_tree() + + def test_move_group_into_another(self): + old_lft, old_rgt = frappe.db.get_value("Test Tree DocType", "Parent 2", ["lft", "rgt"]) + + parent_1 = frappe.get_doc("Test Tree DocType", "Parent 1") + lft, rgt = parent_1.lft, parent_1.rgt + + parent_1.parent_test_tree_doctype = "Parent 2" + parent_1.save() + self.test_basic_tree() + + # after move + new_lft, new_rgt = frappe.db.get_value("Test Tree DocType", "Parent 2", ["lft", "rgt"]) + + # lft should reduce + self.assertEqual(old_lft - new_lft, rgt - lft + 1) + + # adjacent siblings, hence rgt diff will be 0 + self.assertEqual(new_rgt - old_rgt, 0) + + self.nsu.move_it_back() + self.test_basic_tree() + + def test_move_leaf_into_another_group(self): + child_2 = frappe.get_doc("Test Tree DocType", "Child 2") + + # assert that child 2 is not already under parent 1 + parent_lft_old, parent_rgt_old = frappe.db.get_value( + "Test Tree DocType", "Parent 2", ["lft", "rgt"] + ) + self.assertTrue((parent_lft_old > child_2.lft) and (parent_rgt_old > child_2.rgt)) + + child_2.parent_test_tree_doctype = "Parent 2" + child_2.save() + self.test_basic_tree() + + # assert that child 2 is under parent 1 + parent_lft_new, parent_rgt_new = frappe.db.get_value( + "Test Tree DocType", "Parent 2", ["lft", "rgt"] + ) + self.assertFalse((parent_lft_new > child_2.lft) and (parent_rgt_new > child_2.rgt)) + + def test_delete_leaf(self): + global records + el = {"some_fieldname": "Child 1", "parent_test_tree_doctype": "Parent 1", "is_group": 0} + + child_1 = frappe.get_doc("Test Tree DocType", "Child 1") + child_1.delete() + records.remove(el) + + self.test_basic_tree() + + n = frappe.new_doc("Test Tree DocType") + n.update(el) + n.insert() + records.append(el) + + self.test_basic_tree() + + def test_delete_group(self): + # cannot delete group with child, but can delete leaf + with self.assertRaises(NestedSetChildExistsError): + frappe.delete_doc("Test Tree DocType", "Parent 1") + + def test_merge_groups(self): + global records + el = {"some_fieldname": "Parent 2", "parent_test_tree_doctype": "Root Node", "is_group": 1} + frappe.rename_doc("Test Tree DocType", "Parent 2", "Parent 1", merge=True) + records.remove(el) + self.test_basic_tree() + + def test_merge_leaves(self): + global records + el = {"some_fieldname": "Child 3", "parent_test_tree_doctype": "Parent 2", "is_group": 0} + + frappe.rename_doc( + "Test Tree DocType", + "Child 3", + "Child 2", + merge=True, + ) + records.remove(el) + self.test_basic_tree() + + def test_merge_leaf_into_group(self): + with self.assertRaises(NestedSetInvalidMergeError): + frappe.rename_doc("Test Tree DocType", "Child 1", "Parent 1", merge=True) + + def test_merge_group_into_leaf(self): + with self.assertRaises(NestedSetInvalidMergeError): + frappe.rename_doc("Test Tree DocType", "Parent 1", "Child 1", merge=True)