test: Added TestCase for NestedSet (Tree DocTypes)version-14
@@ -2,6 +2,7 @@ | |||||
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors | # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors | ||||
# License: MIT. See LICENSE | # License: MIT. See LICENSE | ||||
import unittest | import unittest | ||||
from typing import Dict, List, Optional | |||||
import frappe | import frappe | ||||
from frappe.core.doctype.doctype.doctype import ( | from frappe.core.doctype.doctype.doctype import ( | ||||
@@ -524,7 +525,7 @@ class TestDocType(unittest.TestCase): | |||||
def test_autoincremented_doctype_transition(self): | def test_autoincremented_doctype_transition(self): | ||||
frappe.delete_doc("testy_autoinc_dt") | frappe.delete_doc("testy_autoinc_dt") | ||||
dt = new_doctype("testy_autoinc_dt", autoincremented=True).insert(ignore_permissions=True) | |||||
dt = new_doctype("testy_autoinc_dt", autoname="autoincrement").insert(ignore_permissions=True) | |||||
dt.autoname = "hash" | dt.autoname = "hash" | ||||
try: | try: | ||||
@@ -538,7 +539,9 @@ class TestDocType(unittest.TestCase): | |||||
dt.delete(ignore_permissions=True) | dt.delete(ignore_permissions=True) | ||||
def new_doctype(name, unique=0, depends_on="", fields=None, autoincremented=False): | |||||
def new_doctype( | |||||
name, unique: bool = False, depends_on: str = "", fields: Optional[List[Dict]] = None, **kwargs | |||||
): | |||||
doc = frappe.get_doc( | doc = frappe.get_doc( | ||||
{ | { | ||||
"doctype": "DocType", | "doctype": "DocType", | ||||
@@ -560,7 +563,7 @@ def new_doctype(name, unique=0, depends_on="", fields=None, autoincremented=Fals | |||||
} | } | ||||
], | ], | ||||
"name": name, | "name": name, | ||||
"autoname": "autoincrement" if autoincremented else "", | |||||
**kwargs, | |||||
} | } | ||||
) | ) | ||||
@@ -3,6 +3,7 @@ | |||||
import hashlib | import hashlib | ||||
import json | import json | ||||
import time | import time | ||||
from typing import List | |||||
from werkzeug.exceptions import NotFound | from werkzeug.exceptions import NotFound | ||||
@@ -20,9 +21,6 @@ from frappe.utils import cstr, date_diff, file_lock, flt, get_datetime_str, now | |||||
from frappe.utils.data import get_absolute_url | from frappe.utils.data import get_absolute_url | ||||
from frappe.utils.global_search import update_global_search | from frappe.utils.global_search import update_global_search | ||||
# once_only validation | |||||
# methods | |||||
def get_doc(*args, **kwargs): | def get_doc(*args, **kwargs): | ||||
"""returns a frappe.model.Document object. | """returns a frappe.model.Document object. | ||||
@@ -188,7 +186,7 @@ class Document(BaseDocument): | |||||
if not self.has_permission(permtype): | if not self.has_permission(permtype): | ||||
self.raise_no_permission_to(permlevel or permtype) | self.raise_no_permission_to(permlevel or permtype) | ||||
def has_permission(self, permtype="read", verbose=False): | |||||
def has_permission(self, permtype="read", verbose=False) -> bool: | |||||
"""Call `frappe.has_permission` if `self.flags.ignore_permissions` | """Call `frappe.has_permission` if `self.flags.ignore_permissions` | ||||
is not set. | is not set. | ||||
@@ -212,7 +210,7 @@ class Document(BaseDocument): | |||||
ignore_mandatory=None, | ignore_mandatory=None, | ||||
set_name=None, | set_name=None, | ||||
set_child_names=True, | set_child_names=True, | ||||
): | |||||
) -> "Document": | |||||
"""Insert the document in the database (as a new document). | """Insert the document in the database (as a new document). | ||||
This will check for user permissions and execute `before_insert`, | This will check for user permissions and execute `before_insert`, | ||||
`validate`, `on_update`, `after_insert` methods if they are written. | `validate`, `on_update`, `after_insert` methods if they are written. | ||||
@@ -294,7 +292,7 @@ class Document(BaseDocument): | |||||
"""Wrapper for _save""" | """Wrapper for _save""" | ||||
return self._save(*args, **kwargs) | return self._save(*args, **kwargs) | ||||
def _save(self, ignore_permissions=None, ignore_version=None): | |||||
def _save(self, ignore_permissions=None, ignore_version=None) -> "Document": | |||||
"""Save the current document in the database in the **DocType**'s table or | """Save the current document in the database in the **DocType**'s table or | ||||
`tabSingles` (for single types). | `tabSingles` (for single types). | ||||
@@ -524,8 +522,7 @@ class Document(BaseDocument): | |||||
self._save_passwords() | self._save_passwords() | ||||
self.validate_workflow() | self.validate_workflow() | ||||
children = self.get_all_children() | |||||
for d in children: | |||||
for d in self.get_all_children(): | |||||
d._validate_data_fields() | d._validate_data_fields() | ||||
d._validate_selects() | d._validate_selects() | ||||
d._validate_non_negative() | d._validate_non_negative() | ||||
@@ -890,7 +887,7 @@ class Document(BaseDocument): | |||||
msg = ", ".join((each[2] for each in cancelled_links)) | msg = ", ".join((each[2] for each in cancelled_links)) | ||||
frappe.throw(_("Cannot link cancelled document: {0}").format(msg), frappe.CancelledLinkError) | frappe.throw(_("Cannot link cancelled document: {0}").format(msg), frappe.CancelledLinkError) | ||||
def get_all_children(self, parenttype=None): | |||||
def get_all_children(self, parenttype=None) -> List["Document"]: | |||||
"""Returns all children documents from **Table** type fields in a list.""" | """Returns all children documents from **Table** type fields in a list.""" | ||||
children = [] | children = [] | ||||
@@ -759,7 +759,7 @@ class TestDDLCommandsPost(unittest.TestCase): | |||||
def test_sequence_table_creation(self): | def test_sequence_table_creation(self): | ||||
from frappe.core.doctype.doctype.test_doctype import new_doctype | from frappe.core.doctype.doctype.test_doctype import new_doctype | ||||
dt = new_doctype("autoinc_dt_seq_test", autoincremented=True).insert(ignore_permissions=True) | |||||
dt = new_doctype("autoinc_dt_seq_test", autoname="autoincrement").insert(ignore_permissions=True) | |||||
if frappe.db.db_type == "postgres": | if frappe.db.db_type == "postgres": | ||||
self.assertTrue( | self.assertTrue( | ||||
@@ -619,7 +619,7 @@ class TestReportview(unittest.TestCase): | |||||
def test_cast_name(self): | def test_cast_name(self): | ||||
from frappe.core.doctype.doctype.test_doctype import new_doctype | from frappe.core.doctype.doctype.test_doctype import new_doctype | ||||
dt = new_doctype("autoinc_dt_test", autoincremented=True).insert(ignore_permissions=True) | |||||
dt = new_doctype("autoinc_dt_test", autoname="autoincrement").insert(ignore_permissions=True) | |||||
query = DatabaseQuery("autoinc_dt_test").execute( | query = DatabaseQuery("autoinc_dt_test").execute( | ||||
fields=["locate('1', `tabautoinc_dt_test`.`name`)", "`tabautoinc_dt_test`.`name`"], | fields=["locate('1', `tabautoinc_dt_test`.`name`)", "`tabautoinc_dt_test`.`name`"], | ||||
@@ -262,7 +262,7 @@ class TestNaming(unittest.TestCase): | |||||
from frappe.core.doctype.doctype.test_doctype import new_doctype | from frappe.core.doctype.doctype.test_doctype import new_doctype | ||||
doctype = "autoinc_doctype" + frappe.generate_hash(length=5) | doctype = "autoinc_doctype" + frappe.generate_hash(length=5) | ||||
dt = new_doctype(doctype, autoincremented=True).insert(ignore_permissions=True) | |||||
dt = new_doctype(doctype, autoname="autoincrement").insert(ignore_permissions=True) | |||||
for i in range(1, 20): | for i in range(1, 20): | ||||
self.assertEqual(frappe.new_doc(doctype).save(ignore_permissions=True).name, i) | self.assertEqual(frappe.new_doc(doctype).save(ignore_permissions=True).name, i) | ||||
@@ -0,0 +1,235 @@ | |||||
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# License: MIT. See LICENSE | |||||
import frappe | |||||
from frappe.core.doctype.doctype.test_doctype import new_doctype | |||||
from frappe.query_builder import Field | |||||
from frappe.query_builder.functions import Max | |||||
from frappe.tests.utils import FrappeTestCase | |||||
from frappe.utils.nestedset import ( | |||||
NestedSetChildExistsError, | |||||
NestedSetInvalidMergeError, | |||||
NestedSetRecursionError, | |||||
get_descendants_of, | |||||
rebuild_tree, | |||||
) | |||||
records = [ | |||||
{ | |||||
"some_fieldname": "Root Node", | |||||
"parent_test_tree_doctype": None, | |||||
"is_group": 1, | |||||
}, | |||||
{ | |||||
"some_fieldname": "Parent 1", | |||||
"parent_test_tree_doctype": "Root Node", | |||||
"is_group": 1, | |||||
}, | |||||
{ | |||||
"some_fieldname": "Parent 2", | |||||
"parent_test_tree_doctype": "Root Node", | |||||
"is_group": 1, | |||||
}, | |||||
{ | |||||
"some_fieldname": "Child 1", | |||||
"parent_test_tree_doctype": "Parent 1", | |||||
"is_group": 0, | |||||
}, | |||||
{ | |||||
"some_fieldname": "Child 2", | |||||
"parent_test_tree_doctype": "Parent 1", | |||||
"is_group": 0, | |||||
}, | |||||
{ | |||||
"some_fieldname": "Child 3", | |||||
"parent_test_tree_doctype": "Parent 2", | |||||
"is_group": 0, | |||||
}, | |||||
] | |||||
class NestedSetTestUtil: | |||||
def setup_test_doctype(self): | |||||
frappe.db.sql("delete from `tabDocType` where `name` = 'Test Tree DocType'") | |||||
frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`") | |||||
self.tree_doctype = new_doctype( | |||||
"Test Tree DocType", is_tree=True, autoname="field:some_fieldname" | |||||
) | |||||
self.tree_doctype.insert() | |||||
for record in records: | |||||
d = frappe.new_doc("Test Tree DocType") | |||||
d.update(record) | |||||
d.insert() | |||||
def teardown_test_doctype(self): | |||||
self.tree_doctype.delete() | |||||
frappe.db.sql_ddl("drop table if exists `tabTest Tree DocType`") | |||||
def move_it_back(self): | |||||
parent_1 = frappe.get_doc("Test Tree DocType", "Parent 1") | |||||
parent_1.parent_test_tree_doctype = "Root Node" | |||||
parent_1.save() | |||||
def get_no_of_children(self, record_name: str) -> int: | |||||
if not record_name: | |||||
return frappe.db.count("Test Tree DocType") | |||||
return len(get_descendants_of("Test Tree DocType", record_name, ignore_permissions=True)) | |||||
class TestNestedSet(FrappeTestCase): | |||||
@classmethod | |||||
def setUpClass(cls) -> None: | |||||
cls.nsu = NestedSetTestUtil() | |||||
cls.nsu.setup_test_doctype() | |||||
super().setUpClass() | |||||
@classmethod | |||||
def tearDownClass(cls) -> None: | |||||
cls.nsu.teardown_test_doctype() | |||||
super().tearDownClass() | |||||
def setUp(self) -> None: | |||||
frappe.db.rollback() | |||||
def test_basic_tree(self): | |||||
global records | |||||
min_lft = 1 | |||||
max_rgt = frappe.qb.from_("Test Tree DocType").select(Max(Field("rgt"))).run(pluck=True)[0] | |||||
for record in records: | |||||
lft, rgt, parent_test_tree_doctype = frappe.db.get_value( | |||||
"Test Tree DocType", | |||||
record["some_fieldname"], | |||||
["lft", "rgt", "parent_test_tree_doctype"], | |||||
) | |||||
if parent_test_tree_doctype: | |||||
parent_lft, parent_rgt = frappe.db.get_value( | |||||
"Test Tree DocType", parent_test_tree_doctype, ["lft", "rgt"] | |||||
) | |||||
else: | |||||
# root | |||||
parent_lft = min_lft - 1 | |||||
parent_rgt = max_rgt + 1 | |||||
self.assertTrue(lft) | |||||
self.assertTrue(rgt) | |||||
self.assertTrue(lft < rgt) | |||||
self.assertTrue(parent_lft < parent_rgt) | |||||
self.assertTrue(lft > parent_lft) | |||||
self.assertTrue(rgt < parent_rgt) | |||||
self.assertTrue(lft >= min_lft) | |||||
self.assertTrue(rgt <= max_rgt) | |||||
no_of_children = self.nsu.get_no_of_children(record["some_fieldname"]) | |||||
self.assertTrue( | |||||
rgt == (lft + 1 + (2 * no_of_children)), | |||||
msg=(record, no_of_children, self.nsu.get_no_of_children(record["some_fieldname"])), | |||||
) | |||||
no_of_children = self.nsu.get_no_of_children(parent_test_tree_doctype) | |||||
self.assertTrue(parent_rgt == (parent_lft + 1 + (2 * no_of_children))) | |||||
def test_recursion(self): | |||||
leaf_node = frappe.get_doc("Test Tree DocType", {"some_fieldname": "Parent 2"}) | |||||
leaf_node.parent_test_tree_doctype = "Child 3" | |||||
self.assertRaises(NestedSetRecursionError, leaf_node.save) | |||||
leaf_node.reload() | |||||
def test_rebuild_tree(self): | |||||
rebuild_tree("Test Tree DocType", "parent_test_tree_doctype") | |||||
self.test_basic_tree() | |||||
def test_move_group_into_another(self): | |||||
old_lft, old_rgt = frappe.db.get_value("Test Tree DocType", "Parent 2", ["lft", "rgt"]) | |||||
parent_1 = frappe.get_doc("Test Tree DocType", "Parent 1") | |||||
lft, rgt = parent_1.lft, parent_1.rgt | |||||
parent_1.parent_test_tree_doctype = "Parent 2" | |||||
parent_1.save() | |||||
self.test_basic_tree() | |||||
# after move | |||||
new_lft, new_rgt = frappe.db.get_value("Test Tree DocType", "Parent 2", ["lft", "rgt"]) | |||||
# lft should reduce | |||||
self.assertEqual(old_lft - new_lft, rgt - lft + 1) | |||||
# adjacent siblings, hence rgt diff will be 0 | |||||
self.assertEqual(new_rgt - old_rgt, 0) | |||||
self.nsu.move_it_back() | |||||
self.test_basic_tree() | |||||
def test_move_leaf_into_another_group(self): | |||||
child_2 = frappe.get_doc("Test Tree DocType", "Child 2") | |||||
# assert that child 2 is not already under parent 1 | |||||
parent_lft_old, parent_rgt_old = frappe.db.get_value( | |||||
"Test Tree DocType", "Parent 2", ["lft", "rgt"] | |||||
) | |||||
self.assertTrue((parent_lft_old > child_2.lft) and (parent_rgt_old > child_2.rgt)) | |||||
child_2.parent_test_tree_doctype = "Parent 2" | |||||
child_2.save() | |||||
self.test_basic_tree() | |||||
# assert that child 2 is under parent 1 | |||||
parent_lft_new, parent_rgt_new = frappe.db.get_value( | |||||
"Test Tree DocType", "Parent 2", ["lft", "rgt"] | |||||
) | |||||
self.assertFalse((parent_lft_new > child_2.lft) and (parent_rgt_new > child_2.rgt)) | |||||
def test_delete_leaf(self): | |||||
global records | |||||
el = {"some_fieldname": "Child 1", "parent_test_tree_doctype": "Parent 1", "is_group": 0} | |||||
child_1 = frappe.get_doc("Test Tree DocType", "Child 1") | |||||
child_1.delete() | |||||
records.remove(el) | |||||
self.test_basic_tree() | |||||
n = frappe.new_doc("Test Tree DocType") | |||||
n.update(el) | |||||
n.insert() | |||||
records.append(el) | |||||
self.test_basic_tree() | |||||
def test_delete_group(self): | |||||
# cannot delete group with child, but can delete leaf | |||||
with self.assertRaises(NestedSetChildExistsError): | |||||
frappe.delete_doc("Test Tree DocType", "Parent 1") | |||||
def test_merge_groups(self): | |||||
global records | |||||
el = {"some_fieldname": "Parent 2", "parent_test_tree_doctype": "Root Node", "is_group": 1} | |||||
frappe.rename_doc("Test Tree DocType", "Parent 2", "Parent 1", merge=True) | |||||
records.remove(el) | |||||
self.test_basic_tree() | |||||
def test_merge_leaves(self): | |||||
global records | |||||
el = {"some_fieldname": "Child 3", "parent_test_tree_doctype": "Parent 2", "is_group": 0} | |||||
frappe.rename_doc( | |||||
"Test Tree DocType", | |||||
"Child 3", | |||||
"Child 2", | |||||
merge=True, | |||||
) | |||||
records.remove(el) | |||||
self.test_basic_tree() | |||||
def test_merge_leaf_into_group(self): | |||||
with self.assertRaises(NestedSetInvalidMergeError): | |||||
frappe.rename_doc("Test Tree DocType", "Child 1", "Parent 1", merge=True) | |||||
def test_merge_group_into_leaf(self): | |||||
with self.assertRaises(NestedSetInvalidMergeError): | |||||
frappe.rename_doc("Test Tree DocType", "Parent 1", "Child 1", merge=True) |