Sfoglia il codice sorgente

refactor: Use Query Builder instead of raw queries

version-14
Gavin D'souza 3 anni fa
parent
commit
53f92e5b5c
2 ha cambiato i file con 70 aggiunte e 28 eliminazioni
  1. +1
    -0
      frappe/query_builder/__init__.py
  2. +69
    -28
      frappe/utils/password.py

+ 1
- 0
frappe/query_builder/__init__.py Vedi File

@@ -1 +1,2 @@
from pypika import *
from frappe.query_builder.utils import get_query_builder, patch_query_execute

+ 69
- 28
frappe/utils/password.py Vedi File

@@ -4,12 +4,16 @@
import string
import frappe
from frappe import _
from frappe.query_builder import Table
from frappe.utils import cstr, encode
from cryptography.fernet import Fernet, InvalidToken
from passlib.hash import pbkdf2_sha256, mysql41
from passlib.registry import register_crypt_handler
from passlib.context import CryptContext

Auth = Table("__Auth")


class LegacyPassword(pbkdf2_sha256):
name = "frappe_legacy"
ident = "$frappel$"
@@ -36,25 +40,52 @@ passlibctx = CryptContext(
)


def get_decrypted_password(doctype, name, fieldname='password', raise_exception=True):
auth = frappe.db.sql('''select `password` from `__Auth`
where doctype=%(doctype)s and name=%(name)s and fieldname=%(fieldname)s and encrypted=1''',
{ 'doctype': doctype, 'name': name, 'fieldname': fieldname })
def get_decrypted_password(doctype, name, fieldname="password", raise_exception=True):
result = (
frappe.qb.from_(Auth)
.select(Auth.password)
.where(
(Auth.doctype == doctype)
& (Auth.name == name)
& (Auth.fieldname == fieldname)
& (Auth.encrypted == 1)
)
.limit(1)
.run()
)

if auth and auth[0][0]:
return decrypt(auth[0][0])
if result and result[0][0]:
return decrypt(result[0][0])

elif raise_exception:
frappe.throw(_('Password not found'), frappe.AuthenticationError)
frappe.throw(_("Password not found"), frappe.AuthenticationError)


def set_encrypted_password(doctype, name, pwd, fieldname="password"):
query = (
frappe.qb.into(Auth)
.columns(Auth.doctype, Auth.name, Auth.fieldname, Auth.password, Auth.encrypted)
.insert(doctype, name, fieldname, encrypt(pwd), 1)
)

# TODO: Simplify this via aliasing methods in `frappe.qb`
if frappe.db.db_type == "mariadb":
query = (
query.on_duplicate_key_update(Auth.doctype, doctype)
.on_duplicate_key_update(Auth.name, name)
.on_duplicate_key_update(Auth.fieldname, fieldname)
)
elif frappe.db.db_type == "postgres":
query = (
query.on_conflict(Auth.doctype, Auth.name, Auth.fieldname)
.do_update(Auth.doctype, doctype)
.do_update(Auth.name, name)
.do_update(Auth.fieldname, fieldname)
)


def set_encrypted_password(doctype, name, pwd, fieldname='password'):
try:
frappe.db.sql("""insert into `__Auth` (doctype, name, fieldname, `password`, encrypted)
values (%(doctype)s, %(name)s, %(fieldname)s, %(pwd)s, 1)
{on_duplicate_update} `password`=%(pwd)s, encrypted=1""".format(
on_duplicate_update=frappe.db.get_on_duplicate_update(['doctype', 'name', 'fieldname'])
), { 'doctype': doctype, 'name': name, 'fieldname': fieldname, 'pwd': encrypt(pwd) })
query.run()

except frappe.db.DataError as e:
if frappe.db.is_data_too_long(e):
frappe.throw(_("Most probably your password is too long."), exc=e)
@@ -68,34 +99,44 @@ def remove_encrypted_password(doctype, name, fieldname='password'):
"fieldname": fieldname
})

def check_password(user, pwd, doctype='User', fieldname='password', delete_tracker_cache=True):
'''Checks if user and password are correct, else raises frappe.AuthenticationError'''

auth = frappe.db.sql("""select `name`, `password` from `__Auth`
where `doctype`=%(doctype)s and `name`=%(name)s and `fieldname`=%(fieldname)s and `encrypted`=0""",
{'doctype': doctype, 'name': user, 'fieldname': fieldname}, as_dict=True)

if not auth or not passlibctx.verify(pwd, auth[0].password):
raise frappe.AuthenticationError(_('Incorrect User or Password'))
def check_password(user, pwd, doctype="User", fieldname="password", delete_tracker_cache=True):
"""Checks if user and password are correct, else raises frappe.AuthenticationError"""

result = (
frappe.qb.from_(Auth)
.select(Auth.name, Auth.password)
.where(
(Auth.doctype == doctype)
& (Auth.name == user)
& (Auth.fieldname == fieldname)
& (Auth.encrypted == 0)
)
.limit(1)
.run(as_dict=True)
)

if not result or not passlibctx.verify(pwd, result[0].password):
raise frappe.AuthenticationError(_("Incorrect User or Password"))

# lettercase agnostic
user = auth[0].name
user = result[0].name

# TODO: This need to be deleted after checking side effects of it.
# We have a `LoginAttemptTracker` that can take care of tracking related cache.
if delete_tracker_cache:
delete_login_failed_cache(user)

if not passlibctx.needs_update(auth[0].password):
if not passlibctx.needs_update(result[0].password):
update_password(user, pwd, doctype, fieldname)

return user


def delete_login_failed_cache(user):
frappe.cache().hdel('last_login_tried', user)
frappe.cache().hdel('login_failed_count', user)
frappe.cache().hdel('locked_account_time', user)
frappe.cache().hdel("last_login_tried", user)
frappe.cache().hdel("login_failed_count", user)
frappe.cache().hdel("locked_account_time", user)


def update_password(user, pwd, doctype='User', fieldname='password', logout_all_sessions=False):
'''


Caricamento…
Annulla
Salva