소스 검색

refactor: frappe.utils.user

* Refactored raw SQLs and outdated APIs to use newer DB + QB APIs
* Simplified Python & Query logic
* Added type hints
* Styled with Black
version-14
Gavin D'souza 3 년 전
부모
커밋
b99cd7ab3b
1개의 변경된 파일168개의 추가작업 그리고 121개의 파일을 삭제
  1. +168
    -121
      frappe/utils/user.py

+ 168
- 121
frappe/utils/user.py 파일 보기

@@ -1,15 +1,22 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE

import frappe, json
from frappe import _dict
from email.utils import formataddr
from typing import Dict, List, Optional, TYPE_CHECKING

import frappe
import frappe.share
from frappe.utils import cint
from frappe import _dict
from frappe.boot import get_allowed_reports
from frappe.permissions import get_roles, get_valid_perms
from frappe.core.doctype.domain_settings.domain_settings import get_active_modules
from frappe.permissions import get_roles, get_valid_perms
from frappe.query_builder import DocType
from frappe.query_builder.functions import Concat_ws
from frappe.query_builder import Order

if TYPE_CHECKING:
from frappe.core.doctype.user.user import User


class UserPermissions:
"""
@@ -150,10 +157,8 @@ class UserPermissions:
self.can_write += self.in_create
self.can_read += self.can_write

self.shared = frappe.db.sql_list("""select distinct share_doctype from `tabDocShare`
where `user`=%s and `read`=1""", self.name)
self.shared = frappe.get_all("DocShare", {"user": self.name, "read": 1}, distinct=True, pluck="share_doctype")
self.can_read = list(set(self.can_read + self.shared))

self.all_read += self.can_read

for dt in no_list_view_link:
@@ -161,11 +166,12 @@ class UserPermissions:
self.can_read.remove(dt)

if "System Manager" in self.get_roles():
docs = frappe.get_all("DocType", {'allow_import': 1})
self.can_import += [doc.name for doc in docs]

customizations = frappe.get_all("Property Setter", fields=['doc_type'], filters={'property': 'allow_import', 'value': "1"})
self.can_import += [custom.doc_type for custom in customizations]
self.can_import += frappe.get_all("DocType", {'allow_import': 1}, pluck="name")
self.can_import += frappe.get_all(
"Property Setter",
pluck="doc_type",
filters={"property": "allow_import", "value": "1"},
)

frappe.cache().hset("can_import", frappe.session.user, self.can_import)

@@ -186,10 +192,24 @@ class UserPermissions:
return self.can_read

def load_user(self):
d = frappe.db.sql("""select email, first_name, last_name, creation,
email_signature, user_type, desk_theme, language,
mute_sounds, send_me_a_copy, document_follow_notify
from tabUser where name = %s""", (self.name,), as_dict=1)[0]
d = frappe.db.get_value(
"User",
self.name,
[
"creation",
"desk_theme",
"document_follow_notify",
"email",
"email_signature",
"first_name",
"language",
"last_name",
"mute_sounds",
"send_me_a_copy",
"user_type",
],
as_dict=True,
)

if not self.can_read:
self.build_permissions()
@@ -209,142 +229,169 @@ class UserPermissions:
def get_all_reports(self):
return get_allowed_reports()

def get_user_fullname(user):

def get_user_fullname(user: str) -> str:
user_doctype = DocType("User")
fullname = frappe.get_value(
user_doctype,
filters={"name": user},
fieldname=Concat_ws(" ", user_doctype.first_name, user_doctype.last_name),
return (
frappe.get_value(
user_doctype,
filters={"name": user},
fieldname=Concat_ws(" ", user_doctype.first_name, user_doctype.last_name),
)
or ""
)
return fullname or ''

def get_fullname_and_avatar(user):
first_name, last_name, avatar, name = frappe.db.get_value("User",
user, ["first_name", "last_name", "user_image", "name"])
return _dict({
"fullname": " ".join(list(filter(None, [first_name, last_name]))),
"avatar": avatar,
"name": name
})

def get_system_managers(only_name=False):


def get_fullname_and_avatar(user: str) -> _dict:
first_name, last_name, avatar, name = frappe.db.get_value(
"User", user, ["first_name", "last_name", "user_image", "name"]
)
return _dict(
{
"fullname": " ".join(list(filter(None, [first_name, last_name]))),
"avatar": avatar,
"name": name,
}
)


def get_system_managers(only_name: bool = False) -> List[str]:
"""returns all system manager's user details"""
import email.utils
system_managers = frappe.db.sql("""SELECT DISTINCT `name`, `creation`,
CONCAT_WS(' ',
CASE WHEN `first_name`= '' THEN NULL ELSE `first_name` END,
CASE WHEN `last_name`= '' THEN NULL ELSE `last_name` END
) AS fullname
FROM `tabUser` AS p
WHERE `docstatus` < 2
AND `enabled` = 1
AND `name` NOT IN ({})
AND exists
(SELECT *
FROM `tabHas Role` AS ur
WHERE ur.parent = p.name
AND ur.role='System Manager')
ORDER BY `creation` DESC""".format(", ".join(["%s"]*len(frappe.STANDARD_USERS))),
frappe.STANDARD_USERS, as_dict=True)
HasRole = DocType("Has Role")
User = DocType("User")

if only_name:
fields = [User.name]
else:
fields = [User.full_name, User.name]

system_managers = (
frappe.qb.from_(User)
.join(HasRole)
.on((HasRole.parent == User.name))
.where(
(HasRole.parenttype == "User")
& (User.enabled == 1)
& (HasRole.role == "System Manager")
& (User.docstatus < 2)
& (User.name.notin(frappe.STANDARD_USERS))
)
.select(*fields)
.orderby(User.creation, order=Order.desc)
.run(as_dict=True)
)

if only_name:
return [p.name for p in system_managers]
else:
return [email.utils.formataddr((p.fullname, p.name)) for p in system_managers]
return [formataddr((p.full_name, p.name)) for p in system_managers]


def add_role(user, role):
def add_role(user: str, role: str) -> None:
frappe.get_doc("User", user).add_roles(role)

def add_system_manager(email, first_name=None, last_name=None, send_welcome_email=False, password=None):

def add_system_manager(
email: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
send_welcome_email: bool = False,
password: str = None,
) -> "User":
# add user
user = frappe.new_doc("User")
user.update({
"name": email,
"email": email,
"enabled": 1,
"first_name": first_name or email,
"last_name": last_name,
"user_type": "System User",
"send_welcome_email": 1 if send_welcome_email else 0
})
user.update(
{
"name": email,
"email": email,
"enabled": 1,
"first_name": first_name or email,
"last_name": last_name,
"user_type": "System User",
"send_welcome_email": 1 if send_welcome_email else 0,
}
)

user.insert()

# add roles
roles = frappe.get_all('Role',
fields=['name'],
filters={
'name': ['not in', ('Administrator', 'Guest', 'All')]
}
roles = frappe.get_all(
"Role",
fields=["name"],
filters={"name": ["not in", ("Administrator", "Guest", "All")]},
)
roles = [role.name for role in roles]
user.add_roles(*roles)

if password:
from frappe.utils.password import update_password

update_password(user=user.name, pwd=password)
return user

def get_enabled_system_users():
# add more fields if required
return frappe.get_all('User',
fields=['email', 'language', 'name'],

def get_enabled_system_users() -> List[Dict]:
return frappe.get_all(
"User",
fields=["email", "language", "name"],
filters={
'user_type': 'System User',
'enabled': 1,
'name': ['not in', ('Administrator', 'Guest')]
}
"user_type": "System User",
"enabled": 1,
"name": ["not in", ("Administrator", "Guest")],
},
)

def is_website_user():
return frappe.db.get_value('User', frappe.session.user, 'user_type') == "Website User"

def is_system_user(username):
return frappe.db.get_value("User", {"email": username, "enabled": 1, "user_type": "System User"})
def is_website_user(username: Optional[str] = None) -> Optional[str]:
return (
frappe.db.get_value("User", username or frappe.session.user, "user_type")
== "Website User"
)


def is_system_user(username: Optional[str] = None) -> Optional[str]:
return frappe.db.get_value(
"User",
{
"email": username or frappe.session.user,
"enabled": 1,
"user_type": "System User",
},
)


def get_users():
def get_users() -> List[Dict]:
from frappe.core.doctype.user.user import get_system_users

users = []
system_managers = frappe.utils.user.get_system_managers(only_name=True)
system_managers = get_system_managers(only_name=True)

for user in get_system_users():
users.append({
"full_name": frappe.utils.user.get_user_fullname(user),
"email": user,
"is_system_manager": 1 if (user in system_managers) else 0
})
users.append(
{
"full_name": get_user_fullname(user),
"email": user,
"is_system_manager": user in system_managers,
}
)

return users

def set_last_active_to_now(user):
from frappe.utils import now_datetime
frappe.db.set_value("User", user, "last_active", now_datetime())


def reset_simultaneous_sessions(user_limit):
for user in frappe.db.sql("""select name, simultaneous_sessions from tabUser
where name not in ('Administrator', 'Guest') and user_type = 'System User' and enabled=1
order by creation desc""", as_dict=1):
if user.simultaneous_sessions < user_limit:
user_limit = user_limit - user.simultaneous_sessions
else:
frappe.db.set_value("User", user.name, "simultaneous_sessions", 1)
user_limit = user_limit - 1

def get_link_to_reset_password(user):
link = ''

if not cint(frappe.db.get_single_value('System Settings', 'setup_complete')):
user = frappe.get_doc("User", user)
link = user.reset_password(send_email=False)
frappe.db.commit()

return {
'link': link
}

def get_users_with_role(role):
return [p[0] for p in frappe.db.sql("""SELECT DISTINCT `tabUser`.`name`
FROM `tabHas Role`, `tabUser`
WHERE `tabHas Role`.`role`=%s
AND `tabUser`.`name`!='Administrator'
AND `tabHas Role`.`parent`=`tabUser`.`name`
AND `tabUser`.`enabled`=1""", role)]

def get_users_with_role(role: str) -> List[str]:
User = DocType("User")
HasRole = DocType("Has Role")

return (
frappe.qb.from_(HasRole)
.from_(User)
.where(
(HasRole.role == role)
& (User.name != "Administrator")
& (User.enabled == 1)
& (HasRole.parent == User.name)
)
.select(User.name)
.distinct()
.run(pluck=True)
)

불러오는 중...
취소
저장