From b99cd7ab3b924b96e5d3a36334ac770b112e0524 Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Thu, 31 Mar 2022 16:15:02 +0530 Subject: [PATCH] refactor: frappe.utils.user * Refactored raw SQLs and outdated APIs to use newer DB + QB APIs * Simplified Python & Query logic * Added type hints * Styled with Black --- frappe/utils/user.py | 289 +++++++++++++++++++++++++------------------ 1 file changed, 168 insertions(+), 121 deletions(-) diff --git a/frappe/utils/user.py b/frappe/utils/user.py index c84b29b1c9..43d9d26ab8 100644 --- a/frappe/utils/user.py +++ b/frappe/utils/user.py @@ -1,15 +1,22 @@ -# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE -import frappe, json -from frappe import _dict +from email.utils import formataddr +from typing import Dict, List, Optional, TYPE_CHECKING + +import frappe import frappe.share -from frappe.utils import cint +from frappe import _dict from frappe.boot import get_allowed_reports -from frappe.permissions import get_roles, get_valid_perms from frappe.core.doctype.domain_settings.domain_settings import get_active_modules +from frappe.permissions import get_roles, get_valid_perms from frappe.query_builder import DocType from frappe.query_builder.functions import Concat_ws +from frappe.query_builder import Order + +if TYPE_CHECKING: + from frappe.core.doctype.user.user import User + class UserPermissions: """ @@ -150,10 +157,8 @@ class UserPermissions: self.can_write += self.in_create self.can_read += self.can_write - self.shared = frappe.db.sql_list("""select distinct share_doctype from `tabDocShare` - where `user`=%s and `read`=1""", self.name) + self.shared = frappe.get_all("DocShare", {"user": self.name, "read": 1}, distinct=True, pluck="share_doctype") self.can_read = list(set(self.can_read + self.shared)) - self.all_read += self.can_read for dt in no_list_view_link: @@ -161,11 +166,12 @@ class UserPermissions: self.can_read.remove(dt) if "System Manager" in self.get_roles(): - docs = frappe.get_all("DocType", {'allow_import': 1}) - self.can_import += [doc.name for doc in docs] - - customizations = frappe.get_all("Property Setter", fields=['doc_type'], filters={'property': 'allow_import', 'value': "1"}) - self.can_import += [custom.doc_type for custom in customizations] + self.can_import += frappe.get_all("DocType", {'allow_import': 1}, pluck="name") + self.can_import += frappe.get_all( + "Property Setter", + pluck="doc_type", + filters={"property": "allow_import", "value": "1"}, + ) frappe.cache().hset("can_import", frappe.session.user, self.can_import) @@ -186,10 +192,24 @@ class UserPermissions: return self.can_read def load_user(self): - d = frappe.db.sql("""select email, first_name, last_name, creation, - email_signature, user_type, desk_theme, language, - mute_sounds, send_me_a_copy, document_follow_notify - from tabUser where name = %s""", (self.name,), as_dict=1)[0] + d = frappe.db.get_value( + "User", + self.name, + [ + "creation", + "desk_theme", + "document_follow_notify", + "email", + "email_signature", + "first_name", + "language", + "last_name", + "mute_sounds", + "send_me_a_copy", + "user_type", + ], + as_dict=True, + ) if not self.can_read: self.build_permissions() @@ -209,142 +229,169 @@ class UserPermissions: def get_all_reports(self): return get_allowed_reports() -def get_user_fullname(user): + +def get_user_fullname(user: str) -> str: user_doctype = DocType("User") - fullname = frappe.get_value( - user_doctype, - filters={"name": user}, - fieldname=Concat_ws(" ", user_doctype.first_name, user_doctype.last_name), + return ( + frappe.get_value( + user_doctype, + filters={"name": user}, + fieldname=Concat_ws(" ", user_doctype.first_name, user_doctype.last_name), + ) + or "" ) - return fullname or '' - -def get_fullname_and_avatar(user): - first_name, last_name, avatar, name = frappe.db.get_value("User", - user, ["first_name", "last_name", "user_image", "name"]) - return _dict({ - "fullname": " ".join(list(filter(None, [first_name, last_name]))), - "avatar": avatar, - "name": name - }) - -def get_system_managers(only_name=False): + + +def get_fullname_and_avatar(user: str) -> _dict: + first_name, last_name, avatar, name = frappe.db.get_value( + "User", user, ["first_name", "last_name", "user_image", "name"] + ) + return _dict( + { + "fullname": " ".join(list(filter(None, [first_name, last_name]))), + "avatar": avatar, + "name": name, + } + ) + + +def get_system_managers(only_name: bool = False) -> List[str]: """returns all system manager's user details""" - import email.utils - system_managers = frappe.db.sql("""SELECT DISTINCT `name`, `creation`, - CONCAT_WS(' ', - CASE WHEN `first_name`= '' THEN NULL ELSE `first_name` END, - CASE WHEN `last_name`= '' THEN NULL ELSE `last_name` END - ) AS fullname - FROM `tabUser` AS p - WHERE `docstatus` < 2 - AND `enabled` = 1 - AND `name` NOT IN ({}) - AND exists - (SELECT * - FROM `tabHas Role` AS ur - WHERE ur.parent = p.name - AND ur.role='System Manager') - ORDER BY `creation` DESC""".format(", ".join(["%s"]*len(frappe.STANDARD_USERS))), - frappe.STANDARD_USERS, as_dict=True) + HasRole = DocType("Has Role") + User = DocType("User") + + if only_name: + fields = [User.name] + else: + fields = [User.full_name, User.name] + + system_managers = ( + frappe.qb.from_(User) + .join(HasRole) + .on((HasRole.parent == User.name)) + .where( + (HasRole.parenttype == "User") + & (User.enabled == 1) + & (HasRole.role == "System Manager") + & (User.docstatus < 2) + & (User.name.notin(frappe.STANDARD_USERS)) + ) + .select(*fields) + .orderby(User.creation, order=Order.desc) + .run(as_dict=True) + ) if only_name: return [p.name for p in system_managers] else: - return [email.utils.formataddr((p.fullname, p.name)) for p in system_managers] + return [formataddr((p.full_name, p.name)) for p in system_managers] + -def add_role(user, role): +def add_role(user: str, role: str) -> None: frappe.get_doc("User", user).add_roles(role) -def add_system_manager(email, first_name=None, last_name=None, send_welcome_email=False, password=None): + +def add_system_manager( + email: str, + first_name: Optional[str] = None, + last_name: Optional[str] = None, + send_welcome_email: bool = False, + password: str = None, +) -> "User": # add user user = frappe.new_doc("User") - user.update({ - "name": email, - "email": email, - "enabled": 1, - "first_name": first_name or email, - "last_name": last_name, - "user_type": "System User", - "send_welcome_email": 1 if send_welcome_email else 0 - }) + user.update( + { + "name": email, + "email": email, + "enabled": 1, + "first_name": first_name or email, + "last_name": last_name, + "user_type": "System User", + "send_welcome_email": 1 if send_welcome_email else 0, + } + ) user.insert() # add roles - roles = frappe.get_all('Role', - fields=['name'], - filters={ - 'name': ['not in', ('Administrator', 'Guest', 'All')] - } + roles = frappe.get_all( + "Role", + fields=["name"], + filters={"name": ["not in", ("Administrator", "Guest", "All")]}, ) roles = [role.name for role in roles] user.add_roles(*roles) if password: from frappe.utils.password import update_password + update_password(user=user.name, pwd=password) + return user -def get_enabled_system_users(): - # add more fields if required - return frappe.get_all('User', - fields=['email', 'language', 'name'], + +def get_enabled_system_users() -> List[Dict]: + return frappe.get_all( + "User", + fields=["email", "language", "name"], filters={ - 'user_type': 'System User', - 'enabled': 1, - 'name': ['not in', ('Administrator', 'Guest')] - } + "user_type": "System User", + "enabled": 1, + "name": ["not in", ("Administrator", "Guest")], + }, ) -def is_website_user(): - return frappe.db.get_value('User', frappe.session.user, 'user_type') == "Website User" -def is_system_user(username): - return frappe.db.get_value("User", {"email": username, "enabled": 1, "user_type": "System User"}) +def is_website_user(username: Optional[str] = None) -> Optional[str]: + return ( + frappe.db.get_value("User", username or frappe.session.user, "user_type") + == "Website User" + ) + + +def is_system_user(username: Optional[str] = None) -> Optional[str]: + return frappe.db.get_value( + "User", + { + "email": username or frappe.session.user, + "enabled": 1, + "user_type": "System User", + }, + ) + -def get_users(): +def get_users() -> List[Dict]: from frappe.core.doctype.user.user import get_system_users + users = [] - system_managers = frappe.utils.user.get_system_managers(only_name=True) + system_managers = get_system_managers(only_name=True) + for user in get_system_users(): - users.append({ - "full_name": frappe.utils.user.get_user_fullname(user), - "email": user, - "is_system_manager": 1 if (user in system_managers) else 0 - }) + users.append( + { + "full_name": get_user_fullname(user), + "email": user, + "is_system_manager": user in system_managers, + } + ) return users -def set_last_active_to_now(user): - from frappe.utils import now_datetime - frappe.db.set_value("User", user, "last_active", now_datetime()) - - -def reset_simultaneous_sessions(user_limit): - for user in frappe.db.sql("""select name, simultaneous_sessions from tabUser - where name not in ('Administrator', 'Guest') and user_type = 'System User' and enabled=1 - order by creation desc""", as_dict=1): - if user.simultaneous_sessions < user_limit: - user_limit = user_limit - user.simultaneous_sessions - else: - frappe.db.set_value("User", user.name, "simultaneous_sessions", 1) - user_limit = user_limit - 1 - -def get_link_to_reset_password(user): - link = '' - - if not cint(frappe.db.get_single_value('System Settings', 'setup_complete')): - user = frappe.get_doc("User", user) - link = user.reset_password(send_email=False) - frappe.db.commit() - - return { - 'link': link - } - -def get_users_with_role(role): - return [p[0] for p in frappe.db.sql("""SELECT DISTINCT `tabUser`.`name` - FROM `tabHas Role`, `tabUser` - WHERE `tabHas Role`.`role`=%s - AND `tabUser`.`name`!='Administrator' - AND `tabHas Role`.`parent`=`tabUser`.`name` - AND `tabUser`.`enabled`=1""", role)] + +def get_users_with_role(role: str) -> List[str]: + User = DocType("User") + HasRole = DocType("Has Role") + + return ( + frappe.qb.from_(HasRole) + .from_(User) + .where( + (HasRole.role == role) + & (User.name != "Administrator") + & (User.enabled == 1) + & (HasRole.parent == User.name) + ) + .select(User.name) + .distinct() + .run(pluck=True) + )