From b4485665ad1c325c4f3935a99f7bdf28cc21acb0 Mon Sep 17 00:00:00 2001 From: Gavin D'souza Date: Thu, 31 Mar 2022 13:42:13 +0530 Subject: [PATCH] refactor: frappe.utils.goal * Deprecate filter_str in whitelisted get_monthly_goal_graph_data API; use filter Dict instead * Simplify logic * Use qb notation instead of raw SQLs * Raise 403 if user does not have access to document Note: This includes a security fix --- frappe/utils/goal.py | 248 +++++++++++++++++++++---------------------- 1 file changed, 120 insertions(+), 128 deletions(-) diff --git a/frappe/utils/goal.py b/frappe/utils/goal.py index 2d7e73eb1a..97624e6272 100644 --- a/frappe/utils/goal.py +++ b/frappe/utils/goal.py @@ -1,157 +1,149 @@ -# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors # License: MIT. See LICENSE +from typing import Dict, Optional + import frappe from frappe import _ +from frappe.query_builder.functions import DateFormat, Function +from frappe.query_builder.utils import DocType +from frappe.utils.data import add_to_date, flt, now_datetime +from frappe.utils.formatters import format_value +from contextlib import suppress + +def get_monthly_results( + goal_doctype: str, + goal_field: str, + date_col: str, + filters: Dict, + aggregation: str = "sum", +) -> Dict: + """Get monthly aggregation values for given field of doctype""" + + Table = DocType(goal_doctype) + date_format = "%m-%Y" if frappe.db.db_type != "postgres" else "MM-YYYY" + + return dict( + frappe.db.query.build_conditions(table=goal_doctype, filters=filters) + .select( + DateFormat(Table[date_col], date_format).as_("month_year"), + Function(aggregation, goal_field), + ) + .groupby("month_year") + .run() + ) -def get_monthly_results(goal_doctype, goal_field, date_col, filter_str, aggregation = 'sum'): - '''Get monthly aggregation values for given field of doctype''' - # TODO: move to ORM? - if(frappe.db.db_type == 'postgres'): - month_year_format_query = '''to_char("{}", 'MM-YYYY')'''.format(date_col) - else: - month_year_format_query = 'date_format(`{}`, "%m-%Y")'.format(date_col) - - conditions = ('where ' + filter_str) if filter_str else '' - results = frappe.db.sql('''SELECT {aggregation}(`{goal_field}`) AS {goal_field}, - {month_year_format_query} AS month_year - FROM `{table_name}` {conditions} - GROUP BY month_year''' - .format( - aggregation=aggregation, - goal_field=goal_field, - month_year_format_query=month_year_format_query, - table_name="tab" + goal_doctype, - conditions=conditions - ), as_dict=True) - - month_to_value_dict = {} - for d in results: - month_to_value_dict[d['month_year']] = d[goal_field] - - return month_to_value_dict - @frappe.whitelist() -def get_monthly_goal_graph_data(title, doctype, docname, goal_value_field, goal_total_field, goal_history_field, - goal_doctype, goal_doctype_link, goal_field, date_field, filter_str, aggregation="sum"): - ''' - Get month-wise graph data for a doctype based on aggregation values of a field in the goal doctype - - :param title: Graph title - :param doctype: doctype of graph doc - :param docname: of the doc to set the graph in - :param goal_value_field: goal field of doctype - :param goal_total_field: current month value field of doctype - :param goal_history_field: cached history field - :param goal_doctype: doctype the goal is based on - :param goal_doctype_link: doctype link field in goal_doctype - :param goal_field: field from which the goal is calculated - :param filter_str: where clause condition - :param aggregation: a value like 'count', 'sum', 'avg' - - :return: dict of graph data - ''' - - from frappe.utils.formatters import format_value - import json - - # should have atleast read perm - if not frappe.has_permission(goal_doctype): - return None - - meta = frappe.get_meta(doctype) +def get_monthly_goal_graph_data( + title: str, + doctype: str, + docname: str, + goal_value_field: str, + goal_total_field: str, + goal_history_field: str, + goal_doctype: str, + goal_doctype_link: str, + goal_field: str, + date_field: str, + filter_str: str = None, + aggregation: str = "sum", + filters: Optional[Dict] = None, +) -> Dict: + """ + Get month-wise graph data for a doctype based on aggregation values of a field in the goal doctype + + :param title: Graph title + :param doctype: doctype of graph doc + :param docname: of the doc to set the graph in + :param goal_value_field: goal field of doctype + :param goal_total_field: current month value field of doctype + :param goal_history_field: cached history field + :param goal_doctype: doctype the goal is based on + :param goal_doctype_link: doctype link field in goal_doctype + :param goal_field: field from which the goal is calculated + :param filter_str: [DEPRECATED] where clause condition. Use filters. + :param aggregation: a value like 'count', 'sum', 'avg' + :param filters: optional filters + + :return: dict of graph data + """ + if isinstance(filter_str, str): + frappe.throw("String filters have been deprecated. Pass Dict filters instead.", exc=DeprecationWarning) + doc = frappe.get_doc(doctype, docname) + doc.check_permission() + meta = doc.meta goal = doc.get(goal_value_field) - formatted_goal = format_value(goal, meta.get_field(goal_value_field), doc) + today_date = now_datetime().date() current_month_value = doc.get(goal_total_field) - formatted_value = format_value(current_month_value, meta.get_field(goal_total_field), doc) - - from frappe.utils import today, getdate, formatdate, add_months - current_month_year = formatdate(today(), "MM-yyyy") - + current_month_year = today_date.strftime("%m-%Y") # eg: "02-2022" + formatted_value = format_value( + current_month_value, meta.get_field(goal_total_field), doc + ) history = doc.get(goal_history_field) - try: - month_to_value_dict = json.loads(history) if history and '{' in history else None - except ValueError: - month_to_value_dict = None + + month_to_value_dict = None + if history: + with suppress(ValueError): + month_to_value_dict = frappe.parse_json(history) if month_to_value_dict is None: - doc_filter = (goal_doctype_link + " = " + frappe.db.escape(docname)) if doctype != goal_doctype else '' - if filter_str: - doc_filter += ' and ' + filter_str if doc_filter else filter_str - month_to_value_dict = get_monthly_results(goal_doctype, goal_field, date_field, doc_filter, aggregation) + doc_filter = {} + with suppress(ValueError): + doc_filter = frappe.parse_json(filters or "{}") + if doctype != goal_doctype: + doc_filter[goal_doctype_link] = docname + + month_to_value_dict = get_monthly_results( + goal_doctype, goal_field, date_field, doc_filter, aggregation + ) month_to_value_dict[current_month_year] = current_month_value - months = [] - months_formatted = [] - values = [] + month_labels = [] + dataset_values = [] values_formatted = [] - for i in range(0, 12): - date_value = add_months(today(), -i) - month_value = formatdate(date_value, "MM-yyyy") - month_word = getdate(date_value).strftime('%b %y') - month_year = getdate(date_value).strftime('%B') + ', ' + getdate(date_value).strftime('%Y') - months.insert(0, month_word) - months_formatted.insert(0, month_year) - if month_value in month_to_value_dict: - val = month_to_value_dict[month_value] - else: - val = 0 - values.insert(0, val) - values_formatted.insert(0, format_value(val, meta.get_field(goal_total_field), doc)) - - y_markers = [] + y_markers = {} + summary_values = [ - { - 'title': _("This month"), - 'color': '#ffa00a', - 'value': formatted_value - } + {"title": _("This month"), "color": "#ffa00a", "value": formatted_value}, ] - if float(goal) > 0: - y_markers = [ - { - 'label': _("Goal"), - 'lineType': "dashed", - 'value': goal - }, - ] + if flt(goal) > 0: + formatted_goal = format_value(goal, meta.get_field(goal_value_field), doc) summary_values += [ + {"title": _("Goal"), "color": "#5e64ff", "value": formatted_goal}, { - 'title': _("Goal"), - 'color': '#5e64ff', - 'value': formatted_goal + "title": _("Completed"), + "color": "#28a745", + "value": f"{int(round(flt(current_month_value) / flt(goal) * 100))}%", }, - { - 'title': _("Completed"), - 'color': '#28a745', - 'value': str(int(round(float(current_month_value)/float(goal)*100))) + "%" - } ] + y_markers = { + "yMarkers": [{"label": _("Goal"), "lineType": "dashed", "value": flt(goal)}] + } - data = { - 'title': title, - # 'subtitle': - - 'data': { - 'datasets': [ - { - 'values': values, - 'formatted': values_formatted - } - ], - 'labels': months, + for i in range(12): + date_value = add_to_date(today_date, months=-i, as_datetime=True) + month_word = date_value.strftime("%b %y") # eg: "Feb 22" + month_labels.insert(0, month_word) + + month_value = date_value.strftime("%m-%Y") # eg: "02-2022" + val = month_to_value_dict.get(month_value, 0) + dataset_values.insert(0, val) + values_formatted.insert( + 0, format_value(val, meta.get_field(goal_total_field), doc) + ) + + return { + "title": title, + "data": { + "datasets": [{"values": dataset_values, "formatted": values_formatted}], + "labels": month_labels, + **y_markers, }, - - 'summary': summary_values, + "summary": summary_values, } - - if y_markers: - data["data"]["yMarkers"] = y_markers - - return data