|
|
@@ -13,24 +13,46 @@ from frappe.model.workflow import apply_workflow, get_workflow_name, has_approva |
|
|
|
from frappe.desk.notifications import clear_doctype_notifications |
|
|
|
from frappe.utils.user import get_users_with_role |
|
|
|
from frappe.utils.data import get_link_to_form |
|
|
|
from frappe.query_builder import DocType |
|
|
|
|
|
|
|
class WorkflowAction(Document): |
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def on_doctype_update(): |
|
|
|
frappe.db.add_index("Workflow Action", ["status", "user"]) |
|
|
|
# The search order in any use case is no ["reference_name", "reference_doctype", "status"] |
|
|
|
# The index scan would happen from left to right |
|
|
|
# so even if status is not in the where clause the index will be used |
|
|
|
frappe.db.add_index("Workflow Action", ["reference_name", "reference_doctype", "status"]) |
|
|
|
|
|
|
|
def get_permission_query_conditions(user): |
|
|
|
if not user: user = frappe.session.user |
|
|
|
|
|
|
|
if user == "Administrator": return "" |
|
|
|
|
|
|
|
return "(`tabWorkflow Action`.`user`='{user}')".format(user=user) |
|
|
|
roles = frappe.get_roles(user) |
|
|
|
|
|
|
|
WorkflowAction = DocType("Workflow Action") |
|
|
|
WorkflowActionPermittedRole = DocType("Workflow Action Permitted Role") |
|
|
|
|
|
|
|
permitted_workflow_actions = (frappe.qb.from_(WorkflowAction) |
|
|
|
.join(WorkflowActionPermittedRole) |
|
|
|
.on(WorkflowAction.name == WorkflowActionPermittedRole.parent) |
|
|
|
.select(WorkflowAction.name) |
|
|
|
.where(WorkflowActionPermittedRole.role.isin(roles)) |
|
|
|
).get_sql() |
|
|
|
|
|
|
|
return f"""(`tabWorkflow Action`.`name` in ({permitted_workflow_actions}) |
|
|
|
or `tabWorkflow Action`.`user`='{user}') |
|
|
|
and `tabWorkflow Action`.`status`='Open'""" |
|
|
|
|
|
|
|
def has_permission(doc, user): |
|
|
|
if user not in ['Administrator', doc.user]: |
|
|
|
return False |
|
|
|
|
|
|
|
user_roles = set(frappe.get_roles(user)) |
|
|
|
|
|
|
|
permitted_roles = {permitted_role.role for permitted_role in doc.permitted_roles} |
|
|
|
|
|
|
|
return user == "Administrator" or user_roles.intersection(permitted_roles) |
|
|
|
|
|
|
|
def process_workflow_actions(doc, state): |
|
|
|
workflow = get_workflow_name(doc.get('doctype')) |
|
|
@@ -42,19 +64,18 @@ def process_workflow_actions(doc, state): |
|
|
|
|
|
|
|
if is_workflow_action_already_created(doc): return |
|
|
|
|
|
|
|
clear_old_workflow_actions(doc) |
|
|
|
update_completed_workflow_actions(doc) |
|
|
|
update_completed_workflow_actions(doc, workflow=workflow, workflow_state=get_doc_workflow_state(doc)) |
|
|
|
clear_doctype_notifications('Workflow Action') |
|
|
|
|
|
|
|
next_possible_transitions = get_next_possible_transitions(workflow, get_doc_workflow_state(doc), doc) |
|
|
|
|
|
|
|
if not next_possible_transitions: return |
|
|
|
|
|
|
|
user_data_map = get_users_next_action_data(next_possible_transitions, doc) |
|
|
|
user_data_map, roles = get_users_next_action_data(next_possible_transitions, doc) |
|
|
|
|
|
|
|
if not user_data_map: return |
|
|
|
|
|
|
|
create_workflow_actions_for_users(user_data_map.keys(), doc) |
|
|
|
create_workflow_actions_for_roles(roles, doc) |
|
|
|
|
|
|
|
if send_email_alert(workflow): |
|
|
|
enqueue(send_workflow_action_email, queue='short', users_data=list(user_data_map.values()), doc=doc) |
|
|
@@ -132,20 +153,85 @@ def return_link_expired_page(doc, doc_workflow_state): |
|
|
|
frappe.bold(frappe.get_value('User', doc.get("modified_by"), 'full_name')) |
|
|
|
), indicator_color='blue') |
|
|
|
|
|
|
|
def clear_old_workflow_actions(doc, user=None): |
|
|
|
|
|
|
|
def update_completed_workflow_actions(doc, user=None, workflow=None, workflow_state=None): |
|
|
|
allowed_roles = get_allowed_roles(user, workflow, workflow_state) |
|
|
|
# There is no transaction leading upto this state |
|
|
|
# so no older actions to complete |
|
|
|
if not allowed_roles: |
|
|
|
return |
|
|
|
if workflow_action := get_workflow_action_by_role(doc, allowed_roles): |
|
|
|
update_completed_workflow_actions_using_role(doc, user, allowed_roles, workflow_action) |
|
|
|
else: |
|
|
|
# backwards compatibility |
|
|
|
# for workflow actions saved using user |
|
|
|
clear_old_workflow_actions_using_user(doc, user) |
|
|
|
update_completed_workflow_actions_using_user(doc, user) |
|
|
|
|
|
|
|
def get_allowed_roles(user, workflow, workflow_state): |
|
|
|
user = user if user else frappe.session.user |
|
|
|
frappe.db.delete("Workflow Action", { |
|
|
|
"reference_doctype": doc.get("doctype"), |
|
|
|
"reference_name": doc.get("name"), |
|
|
|
"user": ("!=", user), |
|
|
|
"status": "Open" |
|
|
|
}) |
|
|
|
|
|
|
|
def update_completed_workflow_actions(doc, user=None): |
|
|
|
allowed_roles = frappe.get_all('Workflow Transition', |
|
|
|
fields='allowed', |
|
|
|
filters=[ |
|
|
|
['parent', '=', workflow], |
|
|
|
['next_state', '=', workflow_state] |
|
|
|
], |
|
|
|
pluck = 'allowed') |
|
|
|
|
|
|
|
user_roles = set(frappe.get_roles(user)) |
|
|
|
return set(allowed_roles).intersection(user_roles) |
|
|
|
|
|
|
|
def get_workflow_action_by_role(doc, allowed_roles): |
|
|
|
WorkflowAction = DocType("Workflow Action") |
|
|
|
WorkflowActionPermittedRole = DocType("Workflow Action Permitted Role") |
|
|
|
return (frappe.qb.from_(WorkflowAction).join(WorkflowActionPermittedRole) |
|
|
|
.on(WorkflowAction.name == WorkflowActionPermittedRole.parent) |
|
|
|
.select(WorkflowAction.name, WorkflowActionPermittedRole.role) |
|
|
|
.where((WorkflowAction.reference_name == doc.get('name')) |
|
|
|
& (WorkflowAction.reference_doctype == doc.get('doctype')) |
|
|
|
& (WorkflowAction.status == 'Open') |
|
|
|
& (WorkflowActionPermittedRole.role.isin(list(allowed_roles)))) |
|
|
|
.orderby(WorkflowActionPermittedRole.role).limit(1)).run(as_dict=True) |
|
|
|
|
|
|
|
def update_completed_workflow_actions_using_role(doc, user=None, allowed_roles = set(), workflow_action=None): |
|
|
|
user = user if user else frappe.session.user |
|
|
|
frappe.db.sql("""UPDATE `tabWorkflow Action` SET `status`='Completed', `completed_by`=%s |
|
|
|
WHERE `reference_doctype`=%s AND `reference_name`=%s AND `user`=%s AND `status`='Open'""", |
|
|
|
(user, doc.get('doctype'), doc.get('name'), user)) |
|
|
|
WorkflowAction = DocType("Workflow Action") |
|
|
|
|
|
|
|
if not workflow_action: |
|
|
|
return |
|
|
|
|
|
|
|
(frappe.qb.update(WorkflowAction) |
|
|
|
.set(WorkflowAction.status, 'Completed') |
|
|
|
.set(WorkflowAction.completed_by, user) |
|
|
|
.set(WorkflowAction.completed_by_role, workflow_action[0].role) |
|
|
|
.where(WorkflowAction.name == workflow_action[0].name) |
|
|
|
).run() |
|
|
|
|
|
|
|
def clear_old_workflow_actions_using_user(doc, user=None): |
|
|
|
user = user if user else frappe.session.user |
|
|
|
|
|
|
|
if frappe.db.has_column('Workflow Action', 'user'): |
|
|
|
frappe.db.delete("Workflow Action", { |
|
|
|
"reference_name": doc.get("name"), |
|
|
|
"reference_doctype": doc.get("doctype"), |
|
|
|
"status": "Open", |
|
|
|
"user": ("!=", user) |
|
|
|
}) |
|
|
|
|
|
|
|
def update_completed_workflow_actions_using_user(doc, user=None): |
|
|
|
user = user or frappe.session.user |
|
|
|
|
|
|
|
if frappe.db.has_column('Workflow Action', 'user'): |
|
|
|
WorkflowAction = DocType("Workflow Action") |
|
|
|
(frappe.qb.update(WorkflowAction) |
|
|
|
.set(WorkflowAction.status, 'Completed') |
|
|
|
.set(WorkflowAction.completed_by, user) |
|
|
|
.where((WorkflowAction.reference_name == doc.get('name')) |
|
|
|
& (WorkflowAction.reference_doctype == doc.get('doctype')) |
|
|
|
& (WorkflowAction.status == 'Open') |
|
|
|
& (WorkflowAction.user == user)) |
|
|
|
).run() |
|
|
|
|
|
|
|
def get_next_possible_transitions(workflow_name, state, doc=None): |
|
|
|
transitions = frappe.get_all('Workflow Transition', |
|
|
@@ -167,8 +253,10 @@ def get_next_possible_transitions(workflow_name, state, doc=None): |
|
|
|
return transitions_to_return |
|
|
|
|
|
|
|
def get_users_next_action_data(transitions, doc): |
|
|
|
roles = set() |
|
|
|
user_data_map = {} |
|
|
|
for transition in transitions: |
|
|
|
roles.add(transition.allowed) |
|
|
|
users = get_users_with_role(transition.allowed) |
|
|
|
filtered_users = filter_allowed_users(users, doc, transition) |
|
|
|
for user in filtered_users: |
|
|
@@ -182,19 +270,24 @@ def get_users_next_action_data(transitions, doc): |
|
|
|
'action_name': transition.action, |
|
|
|
'action_link': get_workflow_action_url(transition.action, doc, user) |
|
|
|
})) |
|
|
|
return user_data_map |
|
|
|
return user_data_map, roles |
|
|
|
|
|
|
|
|
|
|
|
def create_workflow_actions_for_users(users, doc): |
|
|
|
for user in users: |
|
|
|
frappe.get_doc({ |
|
|
|
'doctype': 'Workflow Action', |
|
|
|
'reference_doctype': doc.get('doctype'), |
|
|
|
'reference_name': doc.get('name'), |
|
|
|
'workflow_state': get_doc_workflow_state(doc), |
|
|
|
'status': 'Open', |
|
|
|
'user': user |
|
|
|
}).insert(ignore_permissions=True) |
|
|
|
def create_workflow_actions_for_roles(roles, doc): |
|
|
|
workflow_action = frappe.get_doc({ |
|
|
|
'doctype': 'Workflow Action', |
|
|
|
'reference_doctype': doc.get('doctype'), |
|
|
|
'reference_name': doc.get('name'), |
|
|
|
'workflow_state': get_doc_workflow_state(doc), |
|
|
|
'status': 'Open', |
|
|
|
}) |
|
|
|
|
|
|
|
for role in roles: |
|
|
|
workflow_action.append('permitted_roles', { |
|
|
|
'role': role |
|
|
|
}) |
|
|
|
|
|
|
|
workflow_action.insert(ignore_permissions=True) |
|
|
|
|
|
|
|
def send_workflow_action_email(users_data, doc): |
|
|
|
common_args = get_common_email_args(doc) |
|
|
@@ -249,18 +342,20 @@ def get_confirm_workflow_action_url(doc, action, user): |
|
|
|
def is_workflow_action_already_created(doc): |
|
|
|
return frappe.db.exists({ |
|
|
|
'doctype': 'Workflow Action', |
|
|
|
'reference_doctype': doc.get('doctype'), |
|
|
|
'reference_name': doc.get('name'), |
|
|
|
'workflow_state': get_doc_workflow_state(doc) |
|
|
|
'reference_doctype': doc.get('doctype'), |
|
|
|
'workflow_state': get_doc_workflow_state(doc), |
|
|
|
}) |
|
|
|
|
|
|
|
def clear_workflow_actions(doctype, name): |
|
|
|
if not (doctype and name): |
|
|
|
return |
|
|
|
frappe.db.delete("Workflow Action", { |
|
|
|
"reference_doctype": doctype, |
|
|
|
"reference_name": name |
|
|
|
}) |
|
|
|
frappe.db.delete("Workflow Action", filters = { |
|
|
|
"reference_name": name, |
|
|
|
"reference_doctype": doctype, |
|
|
|
} |
|
|
|
) |
|
|
|
|
|
|
|
def get_doc_workflow_state(doc): |
|
|
|
workflow_name = get_workflow_name(doc.get('doctype')) |
|
|
|
workflow_state_field = get_workflow_state_field(workflow_name) |
|
|
|