fix(utils): Misc fixesversion-14
@@ -780,9 +780,8 @@ def set_user_password(site, user, password, logout_all_sessions=False): | |||||
@pass_context | @pass_context | ||||
def set_last_active_for_user(context, user=None): | def set_last_active_for_user(context, user=None): | ||||
"Set users last active date to current datetime" | "Set users last active date to current datetime" | ||||
from frappe.core.doctype.user.user import get_system_users | from frappe.core.doctype.user.user import get_system_users | ||||
from frappe.utils.user import set_last_active_to_now | |||||
from frappe.utils import now_datetime | |||||
site = get_site(context) | site = get_site(context) | ||||
@@ -795,9 +794,10 @@ def set_last_active_for_user(context, user=None): | |||||
else: | else: | ||||
return | return | ||||
set_last_active_to_now(user) | |||||
frappe.db.set_value("User", user, "last_active", now_datetime()) | |||||
frappe.db.commit() | frappe.db.commit() | ||||
@click.command('publish-realtime') | @click.command('publish-realtime') | ||||
@click.argument('event') | @click.argument('event') | ||||
@click.option('--message') | @click.option('--message') | ||||
@@ -115,21 +115,23 @@ def change_orderby(order: str): | |||||
OPERATOR_MAP = { | OPERATOR_MAP = { | ||||
"+": operator.add, | |||||
"=": operator.eq, | |||||
"-": operator.sub, | |||||
"!=": operator.ne, | |||||
"<": operator.lt, | |||||
">": operator.gt, | |||||
"<=": operator.le, | |||||
">=": operator.ge, | |||||
"in": func_in, | |||||
"not in": func_not_in, | |||||
"like": like, | |||||
"not like": not_like, | |||||
"regex": func_regex, | |||||
"between": func_between | |||||
} | |||||
"+": operator.add, | |||||
"=": operator.eq, | |||||
"-": operator.sub, | |||||
"!=": operator.ne, | |||||
"<": operator.lt, | |||||
">": operator.gt, | |||||
"<=": operator.le, | |||||
"=<": operator.le, | |||||
">=": operator.ge, | |||||
"=>": operator.ge, | |||||
"in": func_in, | |||||
"not in": func_not_in, | |||||
"like": like, | |||||
"not like": not_like, | |||||
"regex": func_regex, | |||||
"between": func_between, | |||||
} | |||||
class Query: | class Query: | ||||
@@ -211,8 +213,7 @@ class Query: | |||||
_operator = OPERATOR_MAP[f[1]] | _operator = OPERATOR_MAP[f[1]] | ||||
conditions = conditions.where(_operator(Field(f[0]), f[2])) | conditions = conditions.where(_operator(Field(f[0]), f[2])) | ||||
conditions = self.add_conditions(conditions, **kwargs) | |||||
return conditions | |||||
return self.add_conditions(conditions, **kwargs) | |||||
def dict_query(self, table: str, filters: Dict[str, Union[str, int]] = None, **kwargs) -> frappe.qb: | def dict_query(self, table: str, filters: Dict[str, Union[str, int]] = None, **kwargs) -> frappe.qb: | ||||
"""Build conditions using the given dictionary filters | """Build conditions using the given dictionary filters | ||||
@@ -251,8 +252,7 @@ class Query: | |||||
field = getattr(_table, key) | field = getattr(_table, key) | ||||
conditions = conditions.where(field.isnull()) | conditions = conditions.where(field.isnull()) | ||||
conditions = self.add_conditions(conditions, **kwargs) | |||||
return conditions | |||||
return self.add_conditions(conditions, **kwargs) | |||||
def build_conditions( | def build_conditions( | ||||
self, | self, | ||||
@@ -43,6 +43,10 @@ CombineDatetime = ImportMapper( | |||||
} | } | ||||
) | ) | ||||
DateFormat = ImportMapper({ | |||||
db_type_is.MARIADB: CustomFunction("DATE_FORMAT", ["date", "format"]), | |||||
db_type_is.POSTGRES: ToChar, | |||||
}) | |||||
class Cast_(Function): | class Cast_(Function): | ||||
def __init__(self, value, as_type, alias=None): | def __init__(self, value, as_type, alias=None): | ||||
@@ -1,10 +1,12 @@ | |||||
from datetime import timedelta | from datetime import timedelta | ||||
from typing import Any, Dict, Optional | from typing import Any, Dict, Optional | ||||
from frappe.utils.data import format_timedelta | |||||
from pypika.terms import Function, ValueWrapper | |||||
from pypika.queries import QueryBuilder | |||||
from pypika.terms import Criterion, Function, ValueWrapper | |||||
from pypika.utils import format_alias_sql | from pypika.utils import format_alias_sql | ||||
from frappe.utils.data import format_timedelta | |||||
class NamedParameterWrapper: | class NamedParameterWrapper: | ||||
"""Utility class to hold parameter values and keys""" | """Utility class to hold parameter values and keys""" | ||||
@@ -100,3 +102,12 @@ class ParameterizedFunction(Function): | |||||
) | ) | ||||
return function_sql | return function_sql | ||||
class subqry(Criterion): | |||||
def __init__(self, subq: QueryBuilder, alias: Optional[str] = None,) -> None: | |||||
super().__init__(alias) | |||||
self.subq = subq | |||||
def get_sql(self, **kwg: Any) -> str: | |||||
kwg["subquery"] = True | |||||
return self.subq.get_sql(**kwg) |
@@ -1,14 +1,15 @@ | |||||
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# License: MIT. See LICENSE | # License: MIT. See LICENSE | ||||
import unittest | import unittest | ||||
import frappe | import frappe | ||||
from frappe.utils import global_search | |||||
from frappe.test_runner import make_test_objects | |||||
from frappe.custom.doctype.property_setter.property_setter import make_property_setter | |||||
from frappe.desk.page.setup_wizard.install_fixtures import update_global_search_doctypes | from frappe.desk.page.setup_wizard.install_fixtures import update_global_search_doctypes | ||||
from frappe.utils import global_search, now_datetime | |||||
from frappe.test_runner import make_test_objects | |||||
import frappe.utils | |||||
class TestGlobalSearch(unittest.TestCase): | class TestGlobalSearch(unittest.TestCase): | ||||
def setUp(self): | def setUp(self): | ||||
@@ -17,7 +18,6 @@ class TestGlobalSearch(unittest.TestCase): | |||||
self.assertTrue('__global_search' in frappe.db.get_tables()) | self.assertTrue('__global_search' in frappe.db.get_tables()) | ||||
doctype = "Event" | doctype = "Event" | ||||
global_search.reset() | global_search.reset() | ||||
from frappe.custom.doctype.property_setter.property_setter import make_property_setter | |||||
make_property_setter(doctype, "subject", "in_global_search", 1, "Int") | make_property_setter(doctype, "subject", "in_global_search", 1, "Int") | ||||
make_property_setter(doctype, "event_type", "in_global_search", 1, "Int") | make_property_setter(doctype, "event_type", "in_global_search", 1, "Int") | ||||
make_property_setter(doctype, "roles", "in_global_search", 1, "Int") | make_property_setter(doctype, "roles", "in_global_search", 1, "Int") | ||||
@@ -42,12 +42,11 @@ class TestGlobalSearch(unittest.TestCase): | |||||
doctype='Event', | doctype='Event', | ||||
subject=text, | subject=text, | ||||
repeat_on='Monthly', | repeat_on='Monthly', | ||||
starts_on=frappe.utils.now_datetime())).insert() | |||||
starts_on=now_datetime())).insert() | |||||
global_search.sync_global_search() | global_search.sync_global_search() | ||||
frappe.db.commit() | frappe.db.commit() | ||||
def test_search(self): | def test_search(self): | ||||
self.insert_test_events() | self.insert_test_events() | ||||
results = global_search.search('awakens') | results = global_search.search('awakens') | ||||
@@ -75,7 +74,6 @@ class TestGlobalSearch(unittest.TestCase): | |||||
results = global_search.search('Monthly') | results = global_search.search('Monthly') | ||||
self.assertEqual(len(results), 0) | self.assertEqual(len(results), 0) | ||||
doctype = "Event" | doctype = "Event" | ||||
from frappe.custom.doctype.property_setter.property_setter import make_property_setter | |||||
make_property_setter(doctype, "repeat_on", "in_global_search", 1, "Int") | make_property_setter(doctype, "repeat_on", "in_global_search", 1, "Int") | ||||
global_search.rebuild_for_doctype(doctype) | global_search.rebuild_for_doctype(doctype) | ||||
results = global_search.search('Monthly') | results = global_search.search('Monthly') | ||||
@@ -91,6 +89,7 @@ class TestGlobalSearch(unittest.TestCase): | |||||
frappe.delete_doc('Event', event_name) | frappe.delete_doc('Event', event_name) | ||||
global_search.sync_global_search() | global_search.sync_global_search() | ||||
frappe.db.commit() | |||||
results = global_search.search(test_subject) | results = global_search.search(test_subject) | ||||
self.assertTrue(all(r["name"] != event_name for r in results), msg="Deleted documents appearing in global search.") | self.assertTrue(all(r["name"] != event_name for r in results), msg="Deleted documents appearing in global search.") | ||||
@@ -111,7 +110,7 @@ class TestGlobalSearch(unittest.TestCase): | |||||
doc = frappe.get_doc({ | doc = frappe.get_doc({ | ||||
'doctype':'Event', | 'doctype':'Event', | ||||
'subject': text, | 'subject': text, | ||||
'starts_on': frappe.utils.now_datetime() | |||||
'starts_on': now_datetime() | |||||
}) | }) | ||||
doc.insert() | doc.insert() | ||||
@@ -172,7 +171,7 @@ class TestGlobalSearch(unittest.TestCase): | |||||
doc = frappe.get_doc({ | doc = frappe.get_doc({ | ||||
'doctype':'Event', | 'doctype':'Event', | ||||
'subject': 'Lorem Ipsum', | 'subject': 'Lorem Ipsum', | ||||
'starts_on': frappe.utils.now_datetime(), | |||||
'starts_on': now_datetime(), | |||||
'description': case["data"] | 'description': case["data"] | ||||
}) | }) | ||||
@@ -1,33 +1,50 @@ | |||||
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# License: MIT. See LICENSE | # License: MIT. See LICENSE | ||||
import unittest | |||||
import frappe | import frappe | ||||
from frappe.utils.goal import get_monthly_results, get_monthly_goal_graph_data | |||||
from frappe.test_runner import make_test_objects | from frappe.test_runner import make_test_objects | ||||
import frappe.utils | |||||
from frappe.utils import format_date, today | |||||
from frappe.utils.goal import get_monthly_goal_graph_data, get_monthly_results | |||||
from frappe.tests.utils import FrappeTestCase | |||||
class TestGoal(unittest.TestCase): | |||||
class TestGoal(FrappeTestCase): | |||||
def setUp(self): | def setUp(self): | ||||
make_test_objects('Event', reset=True) | |||||
make_test_objects("Event", reset=True) | |||||
def tearDown(self): | def tearDown(self): | ||||
frappe.db.delete("Event") | frappe.db.delete("Event") | ||||
# make_test_objects('Event', reset=True) | |||||
frappe.db.commit() | |||||
def test_get_monthly_results(self): | def test_get_monthly_results(self): | ||||
'''Test monthly aggregation values of a field''' | |||||
result_dict = get_monthly_results('Event', 'subject', 'creation', "event_type='Private'", 'count') | |||||
"""Test monthly aggregation values of a field""" | |||||
result_dict = get_monthly_results( | |||||
"Event", | |||||
"subject", | |||||
"creation", | |||||
filters={"event_type": "Private"}, | |||||
aggregation="count", | |||||
) | |||||
from frappe.utils import today, formatdate | |||||
self.assertEqual(result_dict.get(formatdate(today(), "MM-yyyy")), 2) | |||||
self.assertEqual(result_dict.get(format_date(today(), "MM-yyyy")), 2) | |||||
def test_get_monthly_goal_graph_data(self): | def test_get_monthly_goal_graph_data(self): | ||||
'''Test for accurate values in graph data (based on test_get_monthly_results)''' | |||||
docname = frappe.get_list('Event', filters = {"subject": ["=", "_Test Event 1"]})[0]["name"] | |||||
frappe.db.set_value('Event', docname, 'description', 1) | |||||
data = get_monthly_goal_graph_data('Test', 'Event', docname, 'description', 'description', 'description', | |||||
'Event', '', 'description', 'creation', "starts_on = '2014-01-01'", 'count') | |||||
self.assertEqual(float(data['data']['datasets'][0]['values'][-1]), 1) | |||||
"""Test for accurate values in graph data (based on test_get_monthly_results)""" | |||||
docname = frappe.get_list("Event", filters={"subject": ["=", "_Test Event 1"]})[ | |||||
0 | |||||
]["name"] | |||||
frappe.db.set_value("Event", docname, "description", 1) | |||||
data = get_monthly_goal_graph_data( | |||||
"Test", | |||||
"Event", | |||||
docname, | |||||
"description", | |||||
"description", | |||||
"description", | |||||
"Event", | |||||
"", | |||||
"description", | |||||
"creation", | |||||
filters={"starts_on": "2014-01-01"}, | |||||
aggregation="count", | |||||
) | |||||
self.assertEqual(float(data["data"]["datasets"][0]["values"][-1]), 1) |
@@ -176,9 +176,13 @@ def collect_error_snapshots(): | |||||
def clear_old_snapshots(): | def clear_old_snapshots(): | ||||
"""Clear snapshots that are older than a month""" | """Clear snapshots that are older than a month""" | ||||
from frappe.query_builder import DocType, Interval | |||||
from frappe.query_builder.functions import Now | |||||
frappe.db.sql("""delete from `tabError Snapshot` | |||||
where creation < (NOW() - INTERVAL '1' MONTH)""") | |||||
ErrorSnapshot = DocType("Error Snapshot") | |||||
frappe.db.delete(ErrorSnapshot, filters=( | |||||
ErrorSnapshot.creation < (Now() - Interval(months=1)) | |||||
)) | |||||
path = get_error_snapshot_path() | path = get_error_snapshot_path() | ||||
today = datetime.datetime.now() | today = datetime.datetime.now() | ||||
@@ -6,6 +6,7 @@ import os, base64, re, json | |||||
import hashlib | import hashlib | ||||
import mimetypes | import mimetypes | ||||
import io | import io | ||||
from frappe.query_builder.utils import DocType | |||||
from frappe.utils import get_hook_method, get_files_path, random_string, encode, cstr, call_hook_method, cint | from frappe.utils import get_hook_method, get_files_path, random_string, encode, cstr, call_hook_method, cint | ||||
from frappe import _ | from frappe import _ | ||||
from frappe import conf | from frappe import conf | ||||
@@ -176,7 +177,7 @@ def save_file(fname, content, dt, dn, folder=None, decode=False, is_private=0, d | |||||
def get_file_data_from_hash(content_hash, is_private=0): | def get_file_data_from_hash(content_hash, is_private=0): | ||||
for name in frappe.db.sql_list("select name from `tabFile` where content_hash=%s and is_private=%s", (content_hash, is_private)): | |||||
for name in frappe.get_all("File", {"content_hash": content_hash, "is_private": is_private}, pluck="name"): | |||||
b = frappe.get_doc('File', name) | b = frappe.get_doc('File', name) | ||||
return {k: b.get(k) for k in frappe.get_hooks()['write_file_keys']} | return {k: b.get(k) for k in frappe.get_hooks()['write_file_keys']} | ||||
return False | return False | ||||
@@ -230,8 +231,7 @@ def write_file(content, fname, is_private=0): | |||||
def remove_all(dt, dn, from_delete=False, delete_permanently=False): | def remove_all(dt, dn, from_delete=False, delete_permanently=False): | ||||
"""remove all files in a transaction""" | """remove all files in a transaction""" | ||||
try: | try: | ||||
for fid in frappe.db.sql_list("""select name from `tabFile` where | |||||
attached_to_doctype=%s and attached_to_name=%s""", (dt, dn)): | |||||
for fid in frappe.get_all("File", {"attached_to_doctype": dt, "attached_to_name": dn}, pluck="name"): | |||||
if from_delete: | if from_delete: | ||||
# If deleting a doc, directly delete files | # If deleting a doc, directly delete files | ||||
frappe.delete_doc("File", fid, ignore_permissions=True, delete_permanently=delete_permanently) | frappe.delete_doc("File", fid, ignore_permissions=True, delete_permanently=delete_permanently) | ||||
@@ -319,8 +319,10 @@ def get_file_path(file_name): | |||||
if '../' in file_name: | if '../' in file_name: | ||||
return | return | ||||
f = frappe.db.sql("""select file_url from `tabFile` | |||||
where name=%s or file_name=%s""", (file_name, file_name)) | |||||
File = DocType("File") | |||||
f = frappe.qb.from_(File).where((File.name == file_name) & (File.file_name == file_name)).select(File.file_url).run() | |||||
if f: | if f: | ||||
file_name = f[0][0] | file_name = f[0][0] | ||||
@@ -351,7 +353,7 @@ def get_file_name(fname, optional_suffix): | |||||
# convert to unicode | # convert to unicode | ||||
fname = cstr(fname) | fname = cstr(fname) | ||||
n_records = frappe.db.sql("select name from `tabFile` where file_name=%s", fname) | |||||
n_records = frappe.get_all("File", {"file_name": fname}, pluck="name") | |||||
if len(n_records) > 0 or os.path.exists(encode(get_files_path(fname))): | if len(n_records) > 0 or os.path.exists(encode(get_files_path(fname))): | ||||
f = fname.rsplit('.', 1) | f = fname.rsplit('.', 1) | ||||
if len(f) == 1: | if len(f) == 1: | ||||
@@ -1,157 +1,149 @@ | |||||
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# License: MIT. See LICENSE | # License: MIT. See LICENSE | ||||
from typing import Dict, Optional | |||||
import frappe | import frappe | ||||
from frappe import _ | from frappe import _ | ||||
from frappe.query_builder.functions import DateFormat, Function | |||||
from frappe.query_builder.utils import DocType | |||||
from frappe.utils.data import add_to_date, cstr, flt, now_datetime | |||||
from frappe.utils.formatters import format_value | |||||
from contextlib import suppress | |||||
def get_monthly_results( | |||||
goal_doctype: str, | |||||
goal_field: str, | |||||
date_col: str, | |||||
filters: Dict, | |||||
aggregation: str = "sum", | |||||
) -> Dict: | |||||
"""Get monthly aggregation values for given field of doctype""" | |||||
Table = DocType(goal_doctype) | |||||
date_format = "%m-%Y" if frappe.db.db_type != "postgres" else "MM-YYYY" | |||||
return dict( | |||||
frappe.db.query.build_conditions(table=goal_doctype, filters=filters) | |||||
.select( | |||||
DateFormat(Table[date_col], date_format).as_("month_year"), | |||||
Function(aggregation, goal_field), | |||||
) | |||||
.groupby("month_year") | |||||
.run() | |||||
) | |||||
def get_monthly_results(goal_doctype, goal_field, date_col, filter_str, aggregation = 'sum'): | |||||
'''Get monthly aggregation values for given field of doctype''' | |||||
# TODO: move to ORM? | |||||
if(frappe.db.db_type == 'postgres'): | |||||
month_year_format_query = '''to_char("{}", 'MM-YYYY')'''.format(date_col) | |||||
else: | |||||
month_year_format_query = 'date_format(`{}`, "%m-%Y")'.format(date_col) | |||||
conditions = ('where ' + filter_str) if filter_str else '' | |||||
results = frappe.db.sql('''SELECT {aggregation}(`{goal_field}`) AS {goal_field}, | |||||
{month_year_format_query} AS month_year | |||||
FROM `{table_name}` {conditions} | |||||
GROUP BY month_year''' | |||||
.format( | |||||
aggregation=aggregation, | |||||
goal_field=goal_field, | |||||
month_year_format_query=month_year_format_query, | |||||
table_name="tab" + goal_doctype, | |||||
conditions=conditions | |||||
), as_dict=True) | |||||
month_to_value_dict = {} | |||||
for d in results: | |||||
month_to_value_dict[d['month_year']] = d[goal_field] | |||||
return month_to_value_dict | |||||
@frappe.whitelist() | @frappe.whitelist() | ||||
def get_monthly_goal_graph_data(title, doctype, docname, goal_value_field, goal_total_field, goal_history_field, | |||||
goal_doctype, goal_doctype_link, goal_field, date_field, filter_str, aggregation="sum"): | |||||
''' | |||||
Get month-wise graph data for a doctype based on aggregation values of a field in the goal doctype | |||||
:param title: Graph title | |||||
:param doctype: doctype of graph doc | |||||
:param docname: of the doc to set the graph in | |||||
:param goal_value_field: goal field of doctype | |||||
:param goal_total_field: current month value field of doctype | |||||
:param goal_history_field: cached history field | |||||
:param goal_doctype: doctype the goal is based on | |||||
:param goal_doctype_link: doctype link field in goal_doctype | |||||
:param goal_field: field from which the goal is calculated | |||||
:param filter_str: where clause condition | |||||
:param aggregation: a value like 'count', 'sum', 'avg' | |||||
:return: dict of graph data | |||||
''' | |||||
from frappe.utils.formatters import format_value | |||||
import json | |||||
# should have atleast read perm | |||||
if not frappe.has_permission(goal_doctype): | |||||
return None | |||||
meta = frappe.get_meta(doctype) | |||||
def get_monthly_goal_graph_data( | |||||
title: str, | |||||
doctype: str, | |||||
docname: str, | |||||
goal_value_field: str, | |||||
goal_total_field: str, | |||||
goal_history_field: str, | |||||
goal_doctype: str, | |||||
goal_doctype_link: str, | |||||
goal_field: str, | |||||
date_field: str, | |||||
filter_str: str = None, | |||||
aggregation: str = "sum", | |||||
filters: Optional[Dict] = None, | |||||
) -> Dict: | |||||
""" | |||||
Get month-wise graph data for a doctype based on aggregation values of a field in the goal doctype | |||||
:param title: Graph title | |||||
:param doctype: doctype of graph doc | |||||
:param docname: of the doc to set the graph in | |||||
:param goal_value_field: goal field of doctype | |||||
:param goal_total_field: current month value field of doctype | |||||
:param goal_history_field: cached history field | |||||
:param goal_doctype: doctype the goal is based on | |||||
:param goal_doctype_link: doctype link field in goal_doctype | |||||
:param goal_field: field from which the goal is calculated | |||||
:param filter_str: [DEPRECATED] where clause condition. Use filters. | |||||
:param aggregation: a value like 'count', 'sum', 'avg' | |||||
:param filters: optional filters | |||||
:return: dict of graph data | |||||
""" | |||||
if isinstance(filter_str, str): | |||||
frappe.throw("String filters have been deprecated. Pass Dict filters instead.", exc=DeprecationWarning) # nosemgrep | |||||
doc = frappe.get_doc(doctype, docname) | doc = frappe.get_doc(doctype, docname) | ||||
doc.check_permission() | |||||
meta = doc.meta | |||||
goal = doc.get(goal_value_field) | goal = doc.get(goal_value_field) | ||||
formatted_goal = format_value(goal, meta.get_field(goal_value_field), doc) | |||||
today_date = now_datetime().date() | |||||
current_month_value = doc.get(goal_total_field) | current_month_value = doc.get(goal_total_field) | ||||
formatted_value = format_value(current_month_value, meta.get_field(goal_total_field), doc) | |||||
current_month_year = today_date.strftime("%m-%Y") # eg: "02-2022" | |||||
formatted_value = format_value( | |||||
current_month_value, meta.get_field(goal_total_field), doc | |||||
) | |||||
history = doc.get(goal_history_field) | |||||
from frappe.utils import today, getdate, formatdate, add_months | |||||
current_month_year = formatdate(today(), "MM-yyyy") | |||||
month_to_value_dict = None | |||||
if history and "{" in cstr(history): | |||||
with suppress(ValueError): | |||||
month_to_value_dict = frappe.parse_json(history) | |||||
history = doc.get(goal_history_field) | |||||
try: | |||||
month_to_value_dict = json.loads(history) if history and '{' in history else None | |||||
except ValueError: | |||||
month_to_value_dict = None | |||||
if month_to_value_dict is None: # nosemgrep | |||||
doc_filter = {} | |||||
with suppress(ValueError): | |||||
doc_filter = frappe.parse_json(filters or "{}") | |||||
if doctype != goal_doctype: | |||||
doc_filter[goal_doctype_link] = docname | |||||
if month_to_value_dict is None: | |||||
doc_filter = (goal_doctype_link + " = " + frappe.db.escape(docname)) if doctype != goal_doctype else '' | |||||
if filter_str: | |||||
doc_filter += ' and ' + filter_str if doc_filter else filter_str | |||||
month_to_value_dict = get_monthly_results(goal_doctype, goal_field, date_field, doc_filter, aggregation) | |||||
month_to_value_dict = get_monthly_results( | |||||
goal_doctype, goal_field, date_field, doc_filter, aggregation | |||||
) | |||||
month_to_value_dict[current_month_year] = current_month_value | month_to_value_dict[current_month_year] = current_month_value | ||||
months = [] | |||||
months_formatted = [] | |||||
values = [] | |||||
month_labels = [] | |||||
dataset_values = [] | |||||
values_formatted = [] | values_formatted = [] | ||||
for i in range(0, 12): | |||||
date_value = add_months(today(), -i) | |||||
month_value = formatdate(date_value, "MM-yyyy") | |||||
month_word = getdate(date_value).strftime('%b %y') | |||||
month_year = getdate(date_value).strftime('%B') + ', ' + getdate(date_value).strftime('%Y') | |||||
months.insert(0, month_word) | |||||
months_formatted.insert(0, month_year) | |||||
if month_value in month_to_value_dict: | |||||
val = month_to_value_dict[month_value] | |||||
else: | |||||
val = 0 | |||||
values.insert(0, val) | |||||
values_formatted.insert(0, format_value(val, meta.get_field(goal_total_field), doc)) | |||||
y_markers = [] | |||||
y_markers = {} | |||||
summary_values = [ | summary_values = [ | ||||
{ | |||||
'title': _("This month"), | |||||
'color': '#ffa00a', | |||||
'value': formatted_value | |||||
} | |||||
{"title": _("This month"), "color": "#ffa00a", "value": formatted_value}, | |||||
] | ] | ||||
if float(goal) > 0: | |||||
y_markers = [ | |||||
{ | |||||
'label': _("Goal"), | |||||
'lineType': "dashed", | |||||
'value': goal | |||||
}, | |||||
] | |||||
if flt(goal) > 0: | |||||
formatted_goal = format_value(goal, meta.get_field(goal_value_field), doc) | |||||
summary_values += [ | summary_values += [ | ||||
{"title": _("Goal"), "color": "#5e64ff", "value": formatted_goal}, | |||||
{ | { | ||||
'title': _("Goal"), | |||||
'color': '#5e64ff', | |||||
'value': formatted_goal | |||||
"title": _("Completed"), | |||||
"color": "#28a745", | |||||
"value": f"{int(round(flt(current_month_value) / flt(goal) * 100))}%", | |||||
}, | }, | ||||
{ | |||||
'title': _("Completed"), | |||||
'color': '#28a745', | |||||
'value': str(int(round(float(current_month_value)/float(goal)*100))) + "%" | |||||
} | |||||
] | ] | ||||
y_markers = { | |||||
"yMarkers": [{"label": _("Goal"), "lineType": "dashed", "value": flt(goal)}] | |||||
} | |||||
data = { | |||||
'title': title, | |||||
# 'subtitle': | |||||
'data': { | |||||
'datasets': [ | |||||
{ | |||||
'values': values, | |||||
'formatted': values_formatted | |||||
} | |||||
], | |||||
'labels': months, | |||||
for i in range(12): | |||||
date_value = add_to_date(today_date, months=-i, as_datetime=True) | |||||
month_word = date_value.strftime("%b %y") # eg: "Feb 22" | |||||
month_labels.insert(0, month_word) | |||||
month_value = date_value.strftime("%m-%Y") # eg: "02-2022" | |||||
val = month_to_value_dict.get(month_value, 0) | |||||
dataset_values.insert(0, val) | |||||
values_formatted.insert( | |||||
0, format_value(val, meta.get_field(goal_total_field), doc) | |||||
) | |||||
return { | |||||
"title": title, | |||||
"data": { | |||||
"datasets": [{"values": dataset_values, "formatted": values_formatted}], | |||||
"labels": month_labels, | |||||
**y_markers, | |||||
}, | }, | ||||
'summary': summary_values, | |||||
"summary": summary_values, | |||||
} | } | ||||
if y_markers: | |||||
data["data"]["yMarkers"] = y_markers | |||||
return data |
@@ -34,7 +34,7 @@ def after_install(): | |||||
print_settings.save() | print_settings.save() | ||||
# all roles to admin | # all roles to admin | ||||
frappe.get_doc("User", "Administrator").add_roles(*frappe.db.sql_list("""select name from tabRole""")) | |||||
frappe.get_doc("User", "Administrator").add_roles(*frappe.get_all("Role", pluck="name")) | |||||
# update admin password | # update admin password | ||||
update_password("Administrator", get_admin_password()) | update_password("Administrator", get_admin_password()) | ||||
@@ -16,6 +16,9 @@ import frappe | |||||
from frappe import _ | from frappe import _ | ||||
from frappe.model.document import Document | from frappe.model.document import Document | ||||
from frappe.query_builder import DocType, Order | from frappe.query_builder import DocType, Order | ||||
from frappe.query_builder.functions import Coalesce, Max | |||||
from frappe.query_builder.utils import DocType | |||||
class NestedSetRecursionError(frappe.ValidationError): pass | class NestedSetRecursionError(frappe.ValidationError): pass | ||||
class NestedSetMultipleRootsError(frappe.ValidationError): pass | class NestedSetMultipleRootsError(frappe.ValidationError): pass | ||||
@@ -51,87 +54,91 @@ def update_add_node(doc, parent, parent_field): | |||||
""" | """ | ||||
insert a new node | insert a new node | ||||
""" | """ | ||||
doctype = doc.doctype | doctype = doc.doctype | ||||
name = doc.name | name = doc.name | ||||
Table = DocType(doctype) | |||||
# get the last sibling of the parent | # get the last sibling of the parent | ||||
if parent: | if parent: | ||||
left, right = frappe.db.sql("select lft, rgt from `tab{0}` where name=%s for update" | |||||
.format(doctype), parent)[0] | |||||
left, right = frappe.db.get_value(doctype, {"name": parent}, ["lft", "rgt"], for_update=True) | |||||
validate_loop(doc.doctype, doc.name, left, right) | validate_loop(doc.doctype, doc.name, left, right) | ||||
else: # root | else: # root | ||||
right = frappe.db.sql(""" | |||||
SELECT COALESCE(MAX(rgt), 0) + 1 FROM `tab{0}` | |||||
WHERE COALESCE(`{1}`, '') = '' | |||||
""".format(doctype, parent_field))[0][0] | |||||
right = frappe.qb.from_(Table).select( | |||||
Coalesce(Max(Table.rgt), 0) | |||||
).where(Coalesce(Table[parent_field], "") == "").run(pluck=True)[0] | |||||
right = right or 1 | right = right or 1 | ||||
# update all on the right | # update all on the right | ||||
frappe.db.sql("update `tab{0}` set rgt = rgt+2 where rgt >= %s" | |||||
.format(doctype), (right,)) | |||||
frappe.db.sql("update `tab{0}` set lft = lft+2 where lft >= %s" | |||||
.format(doctype), (right,)) | |||||
frappe.qb.update(Table).set(Table.rgt, Table.rgt + 2).where(Table.rgt >= right).run() | |||||
frappe.qb.update(Table).set(Table.lft, Table.lft + 2).where(Table.lft >= right).run() | |||||
# update index of new node | |||||
if frappe.db.sql("select * from `tab{0}` where lft=%s or rgt=%s".format(doctype), (right, right+1)): | |||||
frappe.msgprint(_("Nested set error. Please contact the Administrator.")) | |||||
raise Exception | |||||
if frappe.qb.from_(Table).select("*").where((Table.lft == right) | (Table.rgt == right + 1)).run(): | |||||
frappe.throw(_("Nested set error. Please contact the Administrator.")) | |||||
frappe.db.sql("update `tab{0}` set lft=%s, rgt=%s where name=%s".format(doctype), | |||||
(right,right+1, name)) | |||||
# update index of new node | |||||
frappe.qb.update(Table).set(Table.lft, right).set(Table.rgt, right + 1).where(Table.name == name).run() | |||||
return right | return right | ||||
def update_move_node(doc, parent_field): | |||||
parent = doc.get(parent_field) | |||||
def update_move_node(doc: Document, parent_field: str): | |||||
parent: str = doc.get(parent_field) | |||||
Table = DocType(doc.doctype) | |||||
if parent: | if parent: | ||||
new_parent = frappe.db.sql("""select lft, rgt from `tab{0}` | |||||
where name = %s for update""".format(doc.doctype), parent, as_dict=1)[0] | |||||
new_parent = frappe.qb.from_(Table).select( | |||||
Table.lft, Table.rgt | |||||
).where(Table.name == parent).for_update().run(as_dict=True)[0] | |||||
validate_loop(doc.doctype, doc.name, new_parent.lft, new_parent.rgt) | validate_loop(doc.doctype, doc.name, new_parent.lft, new_parent.rgt) | ||||
# move to dark side | # move to dark side | ||||
frappe.db.sql("""update `tab{0}` set lft = -lft, rgt = -rgt | |||||
where lft >= %s and rgt <= %s""".format(doc.doctype), (doc.lft, doc.rgt)) | |||||
frappe.qb.update(Table).set(Table.lft, - Table.lft).set(Table.rgt, - Table.rgt).where( | |||||
(Table.lft >= doc.lft) & (Table.rgt <= doc.rgt) | |||||
).run() | |||||
# shift left | # shift left | ||||
diff = doc.rgt - doc.lft + 1 | diff = doc.rgt - doc.lft + 1 | ||||
frappe.db.sql("""update `tab{0}` set lft = lft -%s, rgt = rgt - %s | |||||
where lft > %s""".format(doc.doctype), (diff, diff, doc.rgt)) | |||||
frappe.qb.update(Table).set(Table.lft, Table.lft - diff).set(Table.rgt, Table.rgt - diff).where( | |||||
Table.lft > doc.rgt | |||||
).run() | |||||
# shift left rgts of ancestors whose only rgts must shift | # shift left rgts of ancestors whose only rgts must shift | ||||
frappe.db.sql("""update `tab{0}` set rgt = rgt - %s | |||||
where lft < %s and rgt > %s""".format(doc.doctype), (diff, doc.lft, doc.rgt)) | |||||
frappe.qb.update(Table).set(Table.rgt, Table.rgt - diff).where( | |||||
(Table.lft < doc.lft) & (Table.rgt > doc.rgt) | |||||
).run() | |||||
if parent: | if parent: | ||||
new_parent = frappe.db.sql("""select lft, rgt from `tab%s` | |||||
where name = %s for update""" % (doc.doctype, '%s'), parent, as_dict=1)[0] | |||||
# set parent lft, rgt | # set parent lft, rgt | ||||
frappe.db.sql("""update `tab{0}` set rgt = rgt + %s | |||||
where name = %s""".format(doc.doctype), (diff, parent)) | |||||
frappe.qb.update(Table).set(Table.rgt, Table.rgt + diff).where(Table.name == parent).run() | |||||
# shift right at new parent | # shift right at new parent | ||||
frappe.db.sql("""update `tab{0}` set lft = lft + %s, rgt = rgt + %s | |||||
where lft > %s""".format(doc.doctype), (diff, diff, new_parent.rgt)) | |||||
frappe.qb.update(Table).set(Table.lft, Table.lft + diff).set(Table.rgt, Table.rgt + diff).where( | |||||
(Table.lft >= new_parent.lft) & (Table.lft <= new_parent.rgt) | |||||
).run() | |||||
# shift right rgts of ancestors whose only rgts must shift | |||||
frappe.db.sql("""update `tab{0}` set rgt = rgt + %s | |||||
where lft < %s and rgt > %s""".format(doc.doctype), | |||||
(diff, new_parent.lft, new_parent.rgt)) | |||||
frappe.qb.update(Table).set(Table.lft, Table.lft + diff).set(Table.rgt, Table.rgt + diff).where( | |||||
Table.lft > new_parent.rgt | |||||
).run() | |||||
# shift right rgts of ancestors whose only rgts must shift | |||||
frappe.qb.update(Table).set(Table.rgt, Table.rgt + diff).where( | |||||
(Table.lft < new_parent.lft) & (Table.rgt > new_parent.rgt) | |||||
).run() | |||||
new_diff = new_parent.rgt - doc.lft | new_diff = new_parent.rgt - doc.lft | ||||
else: | else: | ||||
# new root | # new root | ||||
max_rgt = frappe.db.sql("""select max(rgt) from `tab{0}`""".format(doc.doctype))[0][0] | |||||
max_rgt = frappe.qb.from_(Table).select(Max(Table.rgt)).run(pluck=True)[0] | |||||
new_diff = max_rgt + 1 - doc.lft | new_diff = max_rgt + 1 - doc.lft | ||||
# bring back from dark side | # bring back from dark side | ||||
frappe.db.sql("""update `tab{0}` set lft = -lft + %s, rgt = -rgt + %s | |||||
where lft < 0""".format(doc.doctype), (new_diff, new_diff)) | |||||
frappe.qb.update(Table).set( | |||||
Table.lft, -Table.lft + new_diff | |||||
).set( | |||||
Table.rgt, -Table.rgt + new_diff | |||||
).where(Table.lft < 0).run() | |||||
@frappe.whitelist() | @frappe.whitelist() | ||||
@@ -197,10 +204,10 @@ def rebuild_node(doctype, parent, left, parent_field): | |||||
def validate_loop(doctype, name, lft, rgt): | def validate_loop(doctype, name, lft, rgt): | ||||
"""check if item not an ancestor (loop)""" | """check if item not an ancestor (loop)""" | ||||
if name in frappe.db.sql_list("""select name from `tab{0}` where lft <= %s and rgt >= %s""" | |||||
.format(doctype), (lft, rgt)): | |||||
if name in frappe.get_all(doctype, filters={"lft": ["<=", lft], "rgt": [">=", rgt]}, pluck="name"): | |||||
frappe.throw(_("Item cannot be added to its own descendents"), NestedSetRecursionError) | frappe.throw(_("Item cannot be added to its own descendents"), NestedSetRecursionError) | ||||
class NestedSet(Document): | class NestedSet(Document): | ||||
def __setup__(self): | def __setup__(self): | ||||
if self.meta.get("nsm_parent_field"): | if self.meta.get("nsm_parent_field"): | ||||
@@ -232,9 +239,7 @@ class NestedSet(Document): | |||||
raise | raise | ||||
def validate_if_child_exists(self): | def validate_if_child_exists(self): | ||||
has_children = frappe.db.sql("""select count(name) from `tab{doctype}` | |||||
where `{nsm_parent_field}`=%s""".format(doctype=self.doctype, nsm_parent_field=self.nsm_parent_field), | |||||
(self.name,))[0][0] | |||||
has_children = frappe.db.count(self.doctype, filters={self.nsm_parent_field: self.name}) | |||||
if has_children: | if has_children: | ||||
frappe.throw(_("Cannot delete {0} as it has child nodes").format(self.name), NestedSetChildExistsError) | frappe.throw(_("Cannot delete {0} as it has child nodes").format(self.name), NestedSetChildExistsError) | ||||
@@ -251,8 +256,7 @@ class NestedSet(Document): | |||||
parent_field = self.nsm_parent_field | parent_field = self.nsm_parent_field | ||||
# set old_parent for children | # set old_parent for children | ||||
frappe.db.sql("update `tab{0}` set old_parent=%s where {1}=%s" | |||||
.format(self.doctype, parent_field), (newdn, newdn)) | |||||
frappe.db.set_value(self.doctype, {"old_parent": newdn}, {parent_field: newdn}, update_modified=False, for_update=False) | |||||
if merge: | if merge: | ||||
rebuild_tree(self.doctype, parent_field) | rebuild_tree(self.doctype, parent_field) | ||||
@@ -269,8 +273,7 @@ class NestedSet(Document): | |||||
def validate_ledger(self, group_identifier="is_group"): | def validate_ledger(self, group_identifier="is_group"): | ||||
if hasattr(self, group_identifier) and not bool(self.get(group_identifier)): | if hasattr(self, group_identifier) and not bool(self.get(group_identifier)): | ||||
if frappe.db.sql("""select name from `tab{0}` where {1}=%s and docstatus!=2""" | |||||
.format(self.doctype, self.nsm_parent_field), (self.name)): | |||||
if frappe.get_all(self.doctype, {self.nsm_parent_field: self.name, "docstatus": ("!=", 2)}): | |||||
frappe.throw(_("{0} {1} cannot be a leaf node as it has children").format(_(self.doctype), self.name)) | frappe.throw(_("{0} {1} cannot be a leaf node as it has children").format(_(self.doctype), self.name)) | ||||
def get_ancestors(self): | def get_ancestors(self): | ||||
@@ -291,10 +294,20 @@ class NestedSet(Document): | |||||
def get_root_of(doctype): | def get_root_of(doctype): | ||||
"""Get root element of a DocType with a tree structure""" | """Get root element of a DocType with a tree structure""" | ||||
result = frappe.db.sql("""select t1.name from `tab{0}` t1 where | |||||
(select count(*) from `tab{1}` t2 where | |||||
t2.lft < t1.lft and t2.rgt > t1.rgt) = 0 | |||||
and t1.rgt > t1.lft""".format(doctype, doctype)) | |||||
from frappe.query_builder.functions import Count | |||||
from frappe.query_builder.terms import subqry | |||||
Table = DocType(doctype) | |||||
t1 = Table.as_("t1") | |||||
t2 = Table.as_("t2") | |||||
subq = frappe.qb.from_(t2).select(Count("*")).where( | |||||
(t2.lft < t1.lft) & (t2.rgt > t1.rgt) | |||||
) | |||||
result = frappe.qb.from_(t1).select(t1.name).where( | |||||
(subqry(subq) == 0) & (t1.rgt > t1.lft) | |||||
).run() | |||||
return result[0][0] if result else None | return result[0][0] if result else None | ||||
def get_ancestors_of(doctype, name, order_by="lft desc", limit=None): | def get_ancestors_of(doctype, name, order_by="lft desc", limit=None): | ||||
@@ -1,15 +1,22 @@ | |||||
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors | |||||
# License: MIT. See LICENSE | # License: MIT. See LICENSE | ||||
import frappe, json | |||||
from frappe import _dict | |||||
from email.utils import formataddr | |||||
from typing import Dict, List, Optional, TYPE_CHECKING | |||||
import frappe | |||||
import frappe.share | import frappe.share | ||||
from frappe.utils import cint | |||||
from frappe import _dict | |||||
from frappe.boot import get_allowed_reports | from frappe.boot import get_allowed_reports | ||||
from frappe.permissions import get_roles, get_valid_perms | |||||
from frappe.core.doctype.domain_settings.domain_settings import get_active_modules | from frappe.core.doctype.domain_settings.domain_settings import get_active_modules | ||||
from frappe.permissions import get_roles, get_valid_perms | |||||
from frappe.query_builder import DocType | from frappe.query_builder import DocType | ||||
from frappe.query_builder.functions import Concat_ws | from frappe.query_builder.functions import Concat_ws | ||||
from frappe.query_builder import Order | |||||
if TYPE_CHECKING: | |||||
from frappe.core.doctype.user.user import User | |||||
class UserPermissions: | class UserPermissions: | ||||
""" | """ | ||||
@@ -64,14 +71,14 @@ class UserPermissions: | |||||
def build_doctype_map(self): | def build_doctype_map(self): | ||||
"""build map of special doctype properties""" | """build map of special doctype properties""" | ||||
self.doctype_map = {} | |||||
active_domains = frappe.get_active_domains() | active_domains = frappe.get_active_domains() | ||||
all_doctypes = frappe.get_all("DocType", fields=["name", "in_create", "module", "istable", "issingle", "read_only", "restrict_to_domain"]) | |||||
self.doctype_map = {} | |||||
for r in frappe.db.sql("""select name, in_create, issingle, istable, | |||||
read_only, restrict_to_domain, module from tabDocType""", as_dict=1): | |||||
if (not r.restrict_to_domain) or (r.restrict_to_domain in active_domains): | |||||
self.doctype_map[r['name']] = r | |||||
for dt in all_doctypes: | |||||
if not dt.restrict_to_domain or (dt.restrict_to_domain in active_domains): | |||||
self.doctype_map[dt["name"]] = dt | |||||
def build_perm_map(self): | def build_perm_map(self): | ||||
"""build map of permissions at level 0""" | """build map of permissions at level 0""" | ||||
@@ -150,10 +157,8 @@ class UserPermissions: | |||||
self.can_write += self.in_create | self.can_write += self.in_create | ||||
self.can_read += self.can_write | self.can_read += self.can_write | ||||
self.shared = frappe.db.sql_list("""select distinct share_doctype from `tabDocShare` | |||||
where `user`=%s and `read`=1""", self.name) | |||||
self.shared = frappe.get_all("DocShare", {"user": self.name, "read": 1}, distinct=True, pluck="share_doctype") | |||||
self.can_read = list(set(self.can_read + self.shared)) | self.can_read = list(set(self.can_read + self.shared)) | ||||
self.all_read += self.can_read | self.all_read += self.can_read | ||||
for dt in no_list_view_link: | for dt in no_list_view_link: | ||||
@@ -161,11 +166,12 @@ class UserPermissions: | |||||
self.can_read.remove(dt) | self.can_read.remove(dt) | ||||
if "System Manager" in self.get_roles(): | if "System Manager" in self.get_roles(): | ||||
docs = frappe.get_all("DocType", {'allow_import': 1}) | |||||
self.can_import += [doc.name for doc in docs] | |||||
customizations = frappe.get_all("Property Setter", fields=['doc_type'], filters={'property': 'allow_import', 'value': "1"}) | |||||
self.can_import += [custom.doc_type for custom in customizations] | |||||
self.can_import += frappe.get_all("DocType", {'allow_import': 1}, pluck="name") | |||||
self.can_import += frappe.get_all( | |||||
"Property Setter", | |||||
pluck="doc_type", | |||||
filters={"property": "allow_import", "value": "1"}, | |||||
) | |||||
frappe.cache().hset("can_import", frappe.session.user, self.can_import) | frappe.cache().hset("can_import", frappe.session.user, self.can_import) | ||||
@@ -186,10 +192,24 @@ class UserPermissions: | |||||
return self.can_read | return self.can_read | ||||
def load_user(self): | def load_user(self): | ||||
d = frappe.db.sql("""select email, first_name, last_name, creation, | |||||
email_signature, user_type, desk_theme, language, | |||||
mute_sounds, send_me_a_copy, document_follow_notify | |||||
from tabUser where name = %s""", (self.name,), as_dict=1)[0] | |||||
d = frappe.db.get_value( | |||||
"User", | |||||
self.name, | |||||
[ | |||||
"creation", | |||||
"desk_theme", | |||||
"document_follow_notify", | |||||
"email", | |||||
"email_signature", | |||||
"first_name", | |||||
"language", | |||||
"last_name", | |||||
"mute_sounds", | |||||
"send_me_a_copy", | |||||
"user_type", | |||||
], | |||||
as_dict=True, | |||||
) | |||||
if not self.can_read: | if not self.can_read: | ||||
self.build_permissions() | self.build_permissions() | ||||
@@ -209,142 +229,169 @@ class UserPermissions: | |||||
def get_all_reports(self): | def get_all_reports(self): | ||||
return get_allowed_reports() | return get_allowed_reports() | ||||
def get_user_fullname(user): | |||||
def get_user_fullname(user: str) -> str: | |||||
user_doctype = DocType("User") | user_doctype = DocType("User") | ||||
fullname = frappe.get_value( | |||||
user_doctype, | |||||
filters={"name": user}, | |||||
fieldname=Concat_ws(" ", user_doctype.first_name, user_doctype.last_name), | |||||
return ( | |||||
frappe.get_value( | |||||
user_doctype, | |||||
filters={"name": user}, | |||||
fieldname=Concat_ws(" ", user_doctype.first_name, user_doctype.last_name), | |||||
) | |||||
or "" | |||||
) | |||||
def get_fullname_and_avatar(user: str) -> _dict: | |||||
first_name, last_name, avatar, name = frappe.db.get_value( | |||||
"User", user, ["first_name", "last_name", "user_image", "name"] | |||||
) | |||||
return _dict( | |||||
{ | |||||
"fullname": " ".join(list(filter(None, [first_name, last_name]))), | |||||
"avatar": avatar, | |||||
"name": name, | |||||
} | |||||
) | ) | ||||
return fullname or '' | |||||
def get_fullname_and_avatar(user): | |||||
first_name, last_name, avatar, name = frappe.db.get_value("User", | |||||
user, ["first_name", "last_name", "user_image", "name"]) | |||||
return _dict({ | |||||
"fullname": " ".join(list(filter(None, [first_name, last_name]))), | |||||
"avatar": avatar, | |||||
"name": name | |||||
}) | |||||
def get_system_managers(only_name=False): | |||||
def get_system_managers(only_name: bool = False) -> List[str]: | |||||
"""returns all system manager's user details""" | """returns all system manager's user details""" | ||||
import email.utils | |||||
system_managers = frappe.db.sql("""SELECT DISTINCT `name`, `creation`, | |||||
CONCAT_WS(' ', | |||||
CASE WHEN `first_name`= '' THEN NULL ELSE `first_name` END, | |||||
CASE WHEN `last_name`= '' THEN NULL ELSE `last_name` END | |||||
) AS fullname | |||||
FROM `tabUser` AS p | |||||
WHERE `docstatus` < 2 | |||||
AND `enabled` = 1 | |||||
AND `name` NOT IN ({}) | |||||
AND exists | |||||
(SELECT * | |||||
FROM `tabHas Role` AS ur | |||||
WHERE ur.parent = p.name | |||||
AND ur.role='System Manager') | |||||
ORDER BY `creation` DESC""".format(", ".join(["%s"]*len(frappe.STANDARD_USERS))), | |||||
frappe.STANDARD_USERS, as_dict=True) | |||||
HasRole = DocType("Has Role") | |||||
User = DocType("User") | |||||
if only_name: | |||||
fields = [User.name] | |||||
else: | |||||
fields = [User.full_name, User.name] | |||||
system_managers = ( | |||||
frappe.qb.from_(User) | |||||
.join(HasRole) | |||||
.on((HasRole.parent == User.name)) | |||||
.where( | |||||
(HasRole.parenttype == "User") | |||||
& (User.enabled == 1) | |||||
& (HasRole.role == "System Manager") | |||||
& (User.docstatus < 2) | |||||
& (User.name.notin(frappe.STANDARD_USERS)) | |||||
) | |||||
.select(*fields) | |||||
.orderby(User.creation, order=Order.desc) | |||||
.run(as_dict=True) | |||||
) | |||||
if only_name: | if only_name: | ||||
return [p.name for p in system_managers] | return [p.name for p in system_managers] | ||||
else: | else: | ||||
return [email.utils.formataddr((p.fullname, p.name)) for p in system_managers] | |||||
return [formataddr((p.full_name, p.name)) for p in system_managers] | |||||
def add_role(user, role): | |||||
def add_role(user: str, role: str) -> None: | |||||
frappe.get_doc("User", user).add_roles(role) | frappe.get_doc("User", user).add_roles(role) | ||||
def add_system_manager(email, first_name=None, last_name=None, send_welcome_email=False, password=None): | |||||
def add_system_manager( | |||||
email: str, | |||||
first_name: Optional[str] = None, | |||||
last_name: Optional[str] = None, | |||||
send_welcome_email: bool = False, | |||||
password: str = None, | |||||
) -> "User": | |||||
# add user | # add user | ||||
user = frappe.new_doc("User") | user = frappe.new_doc("User") | ||||
user.update({ | |||||
"name": email, | |||||
"email": email, | |||||
"enabled": 1, | |||||
"first_name": first_name or email, | |||||
"last_name": last_name, | |||||
"user_type": "System User", | |||||
"send_welcome_email": 1 if send_welcome_email else 0 | |||||
}) | |||||
user.update( | |||||
{ | |||||
"name": email, | |||||
"email": email, | |||||
"enabled": 1, | |||||
"first_name": first_name or email, | |||||
"last_name": last_name, | |||||
"user_type": "System User", | |||||
"send_welcome_email": 1 if send_welcome_email else 0, | |||||
} | |||||
) | |||||
user.insert() | user.insert() | ||||
# add roles | # add roles | ||||
roles = frappe.get_all('Role', | |||||
fields=['name'], | |||||
filters={ | |||||
'name': ['not in', ('Administrator', 'Guest', 'All')] | |||||
} | |||||
roles = frappe.get_all( | |||||
"Role", | |||||
fields=["name"], | |||||
filters={"name": ["not in", ("Administrator", "Guest", "All")]}, | |||||
) | ) | ||||
roles = [role.name for role in roles] | roles = [role.name for role in roles] | ||||
user.add_roles(*roles) | user.add_roles(*roles) | ||||
if password: | if password: | ||||
from frappe.utils.password import update_password | from frappe.utils.password import update_password | ||||
update_password(user=user.name, pwd=password) | update_password(user=user.name, pwd=password) | ||||
return user | |||||
def get_enabled_system_users(): | |||||
# add more fields if required | |||||
return frappe.get_all('User', | |||||
fields=['email', 'language', 'name'], | |||||
def get_enabled_system_users() -> List[Dict]: | |||||
return frappe.get_all( | |||||
"User", | |||||
fields=["email", "language", "name"], | |||||
filters={ | filters={ | ||||
'user_type': 'System User', | |||||
'enabled': 1, | |||||
'name': ['not in', ('Administrator', 'Guest')] | |||||
} | |||||
"user_type": "System User", | |||||
"enabled": 1, | |||||
"name": ["not in", ("Administrator", "Guest")], | |||||
}, | |||||
) | ) | ||||
def is_website_user(): | |||||
return frappe.db.get_value('User', frappe.session.user, 'user_type') == "Website User" | |||||
def is_system_user(username): | |||||
return frappe.db.get_value("User", {"email": username, "enabled": 1, "user_type": "System User"}) | |||||
def is_website_user(username: Optional[str] = None) -> Optional[str]: | |||||
return ( | |||||
frappe.db.get_value("User", username or frappe.session.user, "user_type") | |||||
== "Website User" | |||||
) | |||||
def is_system_user(username: Optional[str] = None) -> Optional[str]: | |||||
return frappe.db.get_value( | |||||
"User", | |||||
{ | |||||
"email": username or frappe.session.user, | |||||
"enabled": 1, | |||||
"user_type": "System User", | |||||
}, | |||||
) | |||||
def get_users(): | |||||
def get_users() -> List[Dict]: | |||||
from frappe.core.doctype.user.user import get_system_users | from frappe.core.doctype.user.user import get_system_users | ||||
users = [] | users = [] | ||||
system_managers = frappe.utils.user.get_system_managers(only_name=True) | |||||
system_managers = get_system_managers(only_name=True) | |||||
for user in get_system_users(): | for user in get_system_users(): | ||||
users.append({ | |||||
"full_name": frappe.utils.user.get_user_fullname(user), | |||||
"email": user, | |||||
"is_system_manager": 1 if (user in system_managers) else 0 | |||||
}) | |||||
users.append( | |||||
{ | |||||
"full_name": get_user_fullname(user), | |||||
"email": user, | |||||
"is_system_manager": user in system_managers, | |||||
} | |||||
) | |||||
return users | return users | ||||
def set_last_active_to_now(user): | |||||
from frappe.utils import now_datetime | |||||
frappe.db.set_value("User", user, "last_active", now_datetime()) | |||||
def reset_simultaneous_sessions(user_limit): | |||||
for user in frappe.db.sql("""select name, simultaneous_sessions from tabUser | |||||
where name not in ('Administrator', 'Guest') and user_type = 'System User' and enabled=1 | |||||
order by creation desc""", as_dict=1): | |||||
if user.simultaneous_sessions < user_limit: | |||||
user_limit = user_limit - user.simultaneous_sessions | |||||
else: | |||||
frappe.db.set_value("User", user.name, "simultaneous_sessions", 1) | |||||
user_limit = user_limit - 1 | |||||
def get_link_to_reset_password(user): | |||||
link = '' | |||||
if not cint(frappe.db.get_single_value('System Settings', 'setup_complete')): | |||||
user = frappe.get_doc("User", user) | |||||
link = user.reset_password(send_email=False) | |||||
frappe.db.commit() | |||||
return { | |||||
'link': link | |||||
} | |||||
def get_users_with_role(role): | |||||
return [p[0] for p in frappe.db.sql("""SELECT DISTINCT `tabUser`.`name` | |||||
FROM `tabHas Role`, `tabUser` | |||||
WHERE `tabHas Role`.`role`=%s | |||||
AND `tabUser`.`name`!='Administrator' | |||||
AND `tabHas Role`.`parent`=`tabUser`.`name` | |||||
AND `tabUser`.`enabled`=1""", role)] | |||||
def get_users_with_role(role: str) -> List[str]: | |||||
User = DocType("User") | |||||
HasRole = DocType("Has Role") | |||||
return ( | |||||
frappe.qb.from_(HasRole) | |||||
.from_(User) | |||||
.where( | |||||
(HasRole.role == role) | |||||
& (User.name != "Administrator") | |||||
& (User.enabled == 1) | |||||
& (HasRole.parent == User.name) | |||||
) | |||||
.select(User.name) | |||||
.distinct() | |||||
.run(pluck=True) | |||||
) |