Co-authored-by: Chinmay D. Pai <chinmaydpai@gmail.com> Co-authored-by: Suraj Shetty <13928957+surajshetty3416@users.noreply.github.com>version-14
@@ -40,6 +40,7 @@ | |||
"password_settings", | |||
"logout_on_password_reset", | |||
"force_user_to_reset_password", | |||
"password_reset_limit", | |||
"column_break_31", | |||
"enable_password_policy", | |||
"minimum_password_score", | |||
@@ -415,6 +416,13 @@ | |||
"fieldtype": "Int", | |||
"label": "Run Jobs only Daily if Inactive For (Days)" | |||
}, | |||
{ | |||
"default": "3", | |||
"description": "Hourly rate limit for generating password reset links", | |||
"fieldname": "password_reset_limit", | |||
"fieldtype": "Int", | |||
"label": "Password Reset Link Generation Limit" | |||
}, | |||
{ | |||
"default": "1", | |||
"fieldname": "logout_on_password_reset", | |||
@@ -19,6 +19,7 @@ class TestUser(unittest.TestCase): | |||
# disable password strength test | |||
frappe.db.set_value("System Settings", "System Settings", "enable_password_policy", 0) | |||
frappe.db.set_value("System Settings", "System Settings", "minimum_password_score", "") | |||
frappe.db.set_value("System Settings", "System Settings", "password_reset_limit", 3) | |||
def test_user_type(self): | |||
new_user = frappe.get_doc(dict(doctype='User', email='test-for-type@example.com', | |||
@@ -222,6 +223,19 @@ class TestUser(unittest.TestCase): | |||
self.assertEqual(extract_mentions(comment)[0], "test_user@example.com") | |||
self.assertEqual(extract_mentions(comment)[1], "test.again@example1.com") | |||
def test_rate_limiting_for_reset_password(self): | |||
from frappe.utils.password import delete_password_reset_cache | |||
delete_password_reset_cache() | |||
frappe.db.set_value("System Settings", "System Settings", "password_reset_limit", 1) | |||
user = frappe.get_doc("User", "testperm@example.com") | |||
link = user.reset_password() | |||
self.assertRegex(link, "\/update-password\?key=[A-Za-z0-9]*") | |||
self.assertRaises(frappe.ValidationError, user.reset_password, False) | |||
def delete_contact(user): | |||
frappe.db.sql("DELETE FROM `tabContact` WHERE `email_id`= %s", user) | |||
frappe.db.sql("DELETE FROM `tabContact Email` WHERE `email_id`= %s", user) |
@@ -13,15 +13,16 @@ from frappe.utils.user import get_system_managers | |||
from bs4 import BeautifulSoup | |||
import frappe.permissions | |||
import frappe.share | |||
import re | |||
import json | |||
from frappe.website.utils import is_signup_enabled | |||
from frappe.utils.background_jobs import enqueue | |||
STANDARD_USERS = ("Guest", "Administrator") | |||
class MaxUsersReachedError(frappe.ValidationError): pass | |||
class MaxUsersReachedError(frappe.ValidationError): | |||
pass | |||
class User(Document): | |||
__new_password = None | |||
@@ -225,6 +226,11 @@ class User(Document): | |||
def reset_password(self, send_email=False, password_expired=False): | |||
from frappe.utils import random_string, get_url | |||
rate_limit = frappe.db.get_single_value("System Settings", "password_reset_limit") | |||
if rate_limit: | |||
check_password_reset_limit(self.name, rate_limit) | |||
key = random_string(32) | |||
self.db_set("reset_password_key", key) | |||
@@ -236,6 +242,7 @@ class User(Document): | |||
if send_email: | |||
self.password_reset_mail(link) | |||
update_password_reset_limit(self.name) | |||
return link | |||
def get_other_system_managers(self): | |||
@@ -1110,3 +1117,16 @@ def generate_keys(user): | |||
return {"api_secret": api_secret} | |||
frappe.throw(frappe._("Not Permitted"), frappe.PermissionError) | |||
def update_password_reset_limit(user): | |||
generated_link_count = get_generated_link_count(user) | |||
generated_link_count += 1 | |||
frappe.cache().hset("password_reset_link_count", user, generated_link_count) | |||
def check_password_reset_limit(user, rate_limit): | |||
generated_link_count = get_generated_link_count(user) | |||
if generated_link_count >= rate_limit: | |||
frappe.throw(_("You have reached the hourly limit for generating password reset links. Please try again later.")) | |||
def get_generated_link_count(user): | |||
return cint(frappe.cache().hget("password_reset_link_count", user)) or 0 |
@@ -196,7 +196,8 @@ scheduler_events = { | |||
"frappe.deferred_insert.save_to_db", | |||
"frappe.desk.form.document_follow.send_hourly_updates", | |||
"frappe.integrations.doctype.google_calendar.google_calendar.sync", | |||
"frappe.email.doctype.newsletter.newsletter.send_scheduled_email" | |||
"frappe.email.doctype.newsletter.newsletter.send_scheduled_email", | |||
"frappe.utils.password.delete_password_reset_cache" | |||
], | |||
"daily": [ | |||
"frappe.email.queue.clear_outbox", | |||
@@ -305,6 +305,7 @@ frappe.patches.v12_0.fix_email_id_formatting | |||
frappe.patches.v13_0.add_toggle_width_in_navbar_settings | |||
frappe.patches.v13_0.rename_notification_fields | |||
frappe.patches.v13_0.remove_duplicate_navbar_items | |||
frappe.patches.v12_0.set_default_password_reset_limit | |||
frappe.patches.v13_0.set_route_for_blog_category | |||
frappe.patches.v13_0.enable_custom_script | |||
frappe.patches.v13_0.update_newsletter_content_type |
@@ -0,0 +1,9 @@ | |||
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors | |||
# MIT License. See license.txt | |||
import frappe | |||
def execute(): | |||
frappe.reload_doc("core", "doctype", "system_settings", force=1) | |||
frappe.db.set_value('System Settings', None, "password_reset_limit", 3) |
@@ -50,6 +50,7 @@ def get_decrypted_password(doctype, name, fieldname='password', raise_exception= | |||
elif raise_exception: | |||
frappe.throw(_('Password not found'), frappe.AuthenticationError) | |||
def set_encrypted_password(doctype, name, pwd, fieldname='password'): | |||
try: | |||
frappe.db.sql("""insert into `__Auth` (doctype, name, fieldname, `password`, encrypted) | |||
@@ -63,6 +64,7 @@ def set_encrypted_password(doctype, name, pwd, fieldname='password'): | |||
frappe.throw("Most probably your password is too long.", exc=e) | |||
raise e | |||
def check_password(user, pwd, doctype='User', fieldname='password'): | |||
'''Checks if user and password are correct, else raises frappe.AuthenticationError''' | |||
@@ -82,11 +84,20 @@ def check_password(user, pwd, doctype='User', fieldname='password'): | |||
return user | |||
def delete_login_failed_cache(user): | |||
frappe.cache().hdel('last_login_tried', user) | |||
frappe.cache().hdel('login_failed_count', user) | |||
frappe.cache().hdel('locked_account_time', user) | |||
def delete_password_reset_cache(user=None): | |||
if user: | |||
frappe.cache().hdel('password_reset_link_count', user) | |||
else: | |||
frappe.cache().delete_key('password_reset_link_count') | |||
def update_password(user, pwd, doctype='User', fieldname='password', logout_all_sessions=False): | |||
''' | |||
Update the password for the User | |||
@@ -115,6 +126,7 @@ def update_password(user, pwd, doctype='User', fieldname='password', logout_all_ | |||
from frappe.sessions import clear_sessions | |||
clear_sessions(user=user, keep_current=True, force=True) | |||
def delete_all_passwords_for(doctype, name): | |||
try: | |||
frappe.db.sql("""delete from `__Auth` where `doctype`=%(doctype)s and `name`=%(name)s""", | |||
@@ -123,26 +135,31 @@ def delete_all_passwords_for(doctype, name): | |||
if not frappe.db.is_missing_column(e): | |||
raise | |||
def rename_password(doctype, old_name, new_name): | |||
# NOTE: fieldname is not considered, since the document is renamed | |||
frappe.db.sql("""update `__Auth` set name=%(new_name)s | |||
where doctype=%(doctype)s and name=%(old_name)s""", | |||
{ 'doctype': doctype, 'new_name': new_name, 'old_name': old_name }) | |||
def rename_password_field(doctype, old_fieldname, new_fieldname): | |||
frappe.db.sql('''update `__Auth` set fieldname=%(new_fieldname)s | |||
where doctype=%(doctype)s and fieldname=%(old_fieldname)s''', | |||
{ 'doctype': doctype, 'old_fieldname': old_fieldname, 'new_fieldname': new_fieldname }) | |||
def create_auth_table(): | |||
# same as Framework.sql | |||
frappe.db.create_auth_table() | |||
def encrypt(pwd): | |||
cipher_suite = Fernet(encode(get_encryption_key())) | |||
cipher_text = cstr(cipher_suite.encrypt(encode(pwd))) | |||
return cipher_text | |||
def decrypt(pwd): | |||
try: | |||
cipher_suite = Fernet(encode(get_encryption_key())) | |||
@@ -152,6 +169,7 @@ def decrypt(pwd): | |||
# encryption_key in site_config is changed and not valid | |||
frappe.throw(_('Encryption key is invalid, Please check site_config.json')) | |||
def get_encryption_key(): | |||
from frappe.installer import update_site_config | |||