瀏覽代碼

refactor: frappe.utils.goal

* Deprecate filter_str in whitelisted get_monthly_goal_graph_data API;
  use filter Dict instead
* Simplify logic
* Use qb notation instead of raw SQLs
* Raise 403 if user does not have access to document

Note: This includes a security fix
version-14
Gavin D'souza 3 年之前
父節點
當前提交
b4485665ad
共有 1 個檔案被更改,包括 120 行新增128 行删除
  1. +120
    -128
      frappe/utils/goal.py

+ 120
- 128
frappe/utils/goal.py 查看文件

@@ -1,157 +1,149 @@
# Copyright (c) 2021, Frappe Technologies Pvt. Ltd. and Contributors
# Copyright (c) 2022, Frappe Technologies Pvt. Ltd. and Contributors
# License: MIT. See LICENSE

from typing import Dict, Optional

import frappe
from frappe import _
from frappe.query_builder.functions import DateFormat, Function
from frappe.query_builder.utils import DocType
from frappe.utils.data import add_to_date, flt, now_datetime
from frappe.utils.formatters import format_value
from contextlib import suppress

def get_monthly_results(
goal_doctype: str,
goal_field: str,
date_col: str,
filters: Dict,
aggregation: str = "sum",
) -> Dict:
"""Get monthly aggregation values for given field of doctype"""

Table = DocType(goal_doctype)
date_format = "%m-%Y" if frappe.db.db_type != "postgres" else "MM-YYYY"

return dict(
frappe.db.query.build_conditions(table=goal_doctype, filters=filters)
.select(
DateFormat(Table[date_col], date_format).as_("month_year"),
Function(aggregation, goal_field),
)
.groupby("month_year")
.run()
)


def get_monthly_results(goal_doctype, goal_field, date_col, filter_str, aggregation = 'sum'):
'''Get monthly aggregation values for given field of doctype'''
# TODO: move to ORM?
if(frappe.db.db_type == 'postgres'):
month_year_format_query = '''to_char("{}", 'MM-YYYY')'''.format(date_col)
else:
month_year_format_query = 'date_format(`{}`, "%m-%Y")'.format(date_col)

conditions = ('where ' + filter_str) if filter_str else ''
results = frappe.db.sql('''SELECT {aggregation}(`{goal_field}`) AS {goal_field},
{month_year_format_query} AS month_year
FROM `{table_name}` {conditions}
GROUP BY month_year'''
.format(
aggregation=aggregation,
goal_field=goal_field,
month_year_format_query=month_year_format_query,
table_name="tab" + goal_doctype,
conditions=conditions
), as_dict=True)

month_to_value_dict = {}
for d in results:
month_to_value_dict[d['month_year']] = d[goal_field]

return month_to_value_dict

@frappe.whitelist()
def get_monthly_goal_graph_data(title, doctype, docname, goal_value_field, goal_total_field, goal_history_field,
goal_doctype, goal_doctype_link, goal_field, date_field, filter_str, aggregation="sum"):
'''
Get month-wise graph data for a doctype based on aggregation values of a field in the goal doctype

:param title: Graph title
:param doctype: doctype of graph doc
:param docname: of the doc to set the graph in
:param goal_value_field: goal field of doctype
:param goal_total_field: current month value field of doctype
:param goal_history_field: cached history field
:param goal_doctype: doctype the goal is based on
:param goal_doctype_link: doctype link field in goal_doctype
:param goal_field: field from which the goal is calculated
:param filter_str: where clause condition
:param aggregation: a value like 'count', 'sum', 'avg'

:return: dict of graph data
'''

from frappe.utils.formatters import format_value
import json

# should have atleast read perm
if not frappe.has_permission(goal_doctype):
return None

meta = frappe.get_meta(doctype)
def get_monthly_goal_graph_data(
title: str,
doctype: str,
docname: str,
goal_value_field: str,
goal_total_field: str,
goal_history_field: str,
goal_doctype: str,
goal_doctype_link: str,
goal_field: str,
date_field: str,
filter_str: str = None,
aggregation: str = "sum",
filters: Optional[Dict] = None,
) -> Dict:
"""
Get month-wise graph data for a doctype based on aggregation values of a field in the goal doctype

:param title: Graph title
:param doctype: doctype of graph doc
:param docname: of the doc to set the graph in
:param goal_value_field: goal field of doctype
:param goal_total_field: current month value field of doctype
:param goal_history_field: cached history field
:param goal_doctype: doctype the goal is based on
:param goal_doctype_link: doctype link field in goal_doctype
:param goal_field: field from which the goal is calculated
:param filter_str: [DEPRECATED] where clause condition. Use filters.
:param aggregation: a value like 'count', 'sum', 'avg'
:param filters: optional filters

:return: dict of graph data
"""
if isinstance(filter_str, str):
frappe.throw("String filters have been deprecated. Pass Dict filters instead.", exc=DeprecationWarning)

doc = frappe.get_doc(doctype, docname)
doc.check_permission()

meta = doc.meta
goal = doc.get(goal_value_field)
formatted_goal = format_value(goal, meta.get_field(goal_value_field), doc)
today_date = now_datetime().date()

current_month_value = doc.get(goal_total_field)
formatted_value = format_value(current_month_value, meta.get_field(goal_total_field), doc)

from frappe.utils import today, getdate, formatdate, add_months
current_month_year = formatdate(today(), "MM-yyyy")

current_month_year = today_date.strftime("%m-%Y") # eg: "02-2022"
formatted_value = format_value(
current_month_value, meta.get_field(goal_total_field), doc
)
history = doc.get(goal_history_field)
try:
month_to_value_dict = json.loads(history) if history and '{' in history else None
except ValueError:
month_to_value_dict = None

month_to_value_dict = None
if history:
with suppress(ValueError):
month_to_value_dict = frappe.parse_json(history)

if month_to_value_dict is None:
doc_filter = (goal_doctype_link + " = " + frappe.db.escape(docname)) if doctype != goal_doctype else ''
if filter_str:
doc_filter += ' and ' + filter_str if doc_filter else filter_str
month_to_value_dict = get_monthly_results(goal_doctype, goal_field, date_field, doc_filter, aggregation)
doc_filter = {}
with suppress(ValueError):
doc_filter = frappe.parse_json(filters or "{}")
if doctype != goal_doctype:
doc_filter[goal_doctype_link] = docname

month_to_value_dict = get_monthly_results(
goal_doctype, goal_field, date_field, doc_filter, aggregation
)

month_to_value_dict[current_month_year] = current_month_value

months = []
months_formatted = []
values = []
month_labels = []
dataset_values = []
values_formatted = []
for i in range(0, 12):
date_value = add_months(today(), -i)
month_value = formatdate(date_value, "MM-yyyy")
month_word = getdate(date_value).strftime('%b %y')
month_year = getdate(date_value).strftime('%B') + ', ' + getdate(date_value).strftime('%Y')
months.insert(0, month_word)
months_formatted.insert(0, month_year)
if month_value in month_to_value_dict:
val = month_to_value_dict[month_value]
else:
val = 0
values.insert(0, val)
values_formatted.insert(0, format_value(val, meta.get_field(goal_total_field), doc))

y_markers = []
y_markers = {}

summary_values = [
{
'title': _("This month"),
'color': '#ffa00a',
'value': formatted_value
}
{"title": _("This month"), "color": "#ffa00a", "value": formatted_value},
]

if float(goal) > 0:
y_markers = [
{
'label': _("Goal"),
'lineType': "dashed",
'value': goal
},
]
if flt(goal) > 0:
formatted_goal = format_value(goal, meta.get_field(goal_value_field), doc)
summary_values += [
{"title": _("Goal"), "color": "#5e64ff", "value": formatted_goal},
{
'title': _("Goal"),
'color': '#5e64ff',
'value': formatted_goal
"title": _("Completed"),
"color": "#28a745",
"value": f"{int(round(flt(current_month_value) / flt(goal) * 100))}%",
},
{
'title': _("Completed"),
'color': '#28a745',
'value': str(int(round(float(current_month_value)/float(goal)*100))) + "%"
}
]
y_markers = {
"yMarkers": [{"label": _("Goal"), "lineType": "dashed", "value": flt(goal)}]
}

data = {
'title': title,
# 'subtitle':

'data': {
'datasets': [
{
'values': values,
'formatted': values_formatted
}
],
'labels': months,
for i in range(12):
date_value = add_to_date(today_date, months=-i, as_datetime=True)
month_word = date_value.strftime("%b %y") # eg: "Feb 22"
month_labels.insert(0, month_word)

month_value = date_value.strftime("%m-%Y") # eg: "02-2022"
val = month_to_value_dict.get(month_value, 0)
dataset_values.insert(0, val)
values_formatted.insert(
0, format_value(val, meta.get_field(goal_total_field), doc)
)

return {
"title": title,
"data": {
"datasets": [{"values": dataset_values, "formatted": values_formatted}],
"labels": month_labels,
**y_markers,
},

'summary': summary_values,
"summary": summary_values,
}

if y_markers:
data["data"]["yMarkers"] = y_markers

return data

Loading…
取消
儲存