|
@@ -20,7 +20,7 @@ from frappe.core.doctype.authentication_log.authentication_log import add_authen |
|
|
|
|
|
|
|
|
from urllib import quote |
|
|
from urllib import quote |
|
|
|
|
|
|
|
|
import pyotp |
|
|
|
|
|
|
|
|
import pyotp,base64,os |
|
|
|
|
|
|
|
|
class HTTPRequest: |
|
|
class HTTPRequest: |
|
|
def __init__(self): |
|
|
def __init__(self): |
|
@@ -116,118 +116,104 @@ class LoginManager: |
|
|
self.make_session() |
|
|
self.make_session() |
|
|
self.set_user_info() |
|
|
self.set_user_info() |
|
|
|
|
|
|
|
|
|
|
|
def two_factor_auth_user(self): |
|
|
|
|
|
''' Check if user has 2fa role and set otpsecret and verification method''' |
|
|
|
|
|
two_factor_user_role = 0 |
|
|
|
|
|
user_obj = frappe.get_doc('User', self.user) |
|
|
|
|
|
if user_obj.roles: |
|
|
|
|
|
query = """select name from `tabRole` where two_factor_auth=1 |
|
|
|
|
|
and name in ("All",{0}) limit 1""".format(', '.join('\"{}\"'.format(i.role) for \ |
|
|
|
|
|
i in user_obj.roles)) |
|
|
|
|
|
two_factor_user_role = len(frappe.db.sql(query)) |
|
|
|
|
|
|
|
|
|
|
|
self.otp_secret = frappe.db.get_default(self.user + '_otpsecret') |
|
|
|
|
|
if not self.otp_secret: |
|
|
|
|
|
self.otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8') |
|
|
|
|
|
frappe.db.set_default(self.user + '_otpsecret', self.otp_secret) |
|
|
|
|
|
frappe.db.commit() |
|
|
|
|
|
|
|
|
|
|
|
self.verification_method = frappe.db.get_value('System Settings', None, 'two_factor_method') |
|
|
|
|
|
|
|
|
|
|
|
return bool(two_factor_user_role) |
|
|
|
|
|
|
|
|
|
|
|
def get_verification_obj(self): |
|
|
|
|
|
if self.verification_method == 'SMS': |
|
|
|
|
|
user_phone = frappe.db.get_value('User', self.user, ['phone','mobile_no'], as_dict=1) |
|
|
|
|
|
usr_phone = user_phone.mobile_no or user_phone.phone |
|
|
|
|
|
status = self.send_token_via_sms(token=token, phone_no=usr_phone, otpsecret=self.otp_secret) |
|
|
|
|
|
verification_obj = {'token_delivery': status, |
|
|
|
|
|
'prompt': status and 'Enter verification code sent to {}'.format(usr_phone[:4] + '******' + usr_phone[-3:]), |
|
|
|
|
|
'method': 'SMS'} |
|
|
|
|
|
elif self.verification_method == 'OTP App': |
|
|
|
|
|
totp_uri = pyotp.TOTP(self.otp_secret).provisioning_uri(self.user, issuer_name="Estate Manager") |
|
|
|
|
|
|
|
|
|
|
|
if frappe.db.get_default(self.user + '_otplogin'): |
|
|
|
|
|
otp_setup_completed = True |
|
|
|
|
|
else: |
|
|
|
|
|
otp_setup_completed = False |
|
|
|
|
|
|
|
|
|
|
|
verification_obj = {'token_delivery': True, |
|
|
|
|
|
'prompt': False, |
|
|
|
|
|
'totp_uri': totp_uri, |
|
|
|
|
|
'method': 'OTP App', |
|
|
|
|
|
'qrcode': get_qr_svg_code(totp_uri), |
|
|
|
|
|
'otp_setup_completed': otp_setup_completed} |
|
|
|
|
|
elif self.verification_method == 'Email': |
|
|
|
|
|
status = self.send_token_via_email(token=token,otpsecret=self.otp_secret) |
|
|
|
|
|
verification_obj = {'token_delivery': status, |
|
|
|
|
|
'prompt': status and 'Enter verification code sent to your registered email address', |
|
|
|
|
|
'method': 'Email'} |
|
|
|
|
|
return verification_obj |
|
|
|
|
|
|
|
|
|
|
|
def process_2fa(self): |
|
|
|
|
|
if self.two_factor_auth_user(): |
|
|
|
|
|
token = int(pyotp.TOTP(self.otp_secret).now()) |
|
|
|
|
|
verification_obj = self.get_verification_obj() |
|
|
|
|
|
|
|
|
|
|
|
tmp_id = frappe.generate_hash(length=8) |
|
|
|
|
|
usr = frappe.form_dict.get('usr') |
|
|
|
|
|
pwd = frappe.form_dict.get('pwd') |
|
|
|
|
|
|
|
|
|
|
|
if self.verification_method in ['SMS', 'Email']: |
|
|
|
|
|
frappe.cache().set(tmp_id + '_token',token) |
|
|
|
|
|
frappe.cache().expire(tmp_id + '_token',300) |
|
|
|
|
|
|
|
|
|
|
|
frappe.cache().set(tmp_id + '_usr', usr) |
|
|
|
|
|
frappe.cache().set(tmp_id + '_pwd', pwd) |
|
|
|
|
|
frappe.cache().set(tmp_id + '_otp_secret', self.otp_secret) |
|
|
|
|
|
frappe.cache().set(tmp_id + '_user', self.user) |
|
|
|
|
|
|
|
|
|
|
|
for field in [tmp_id + nm for nm in ['_usr', '_pwd', '_otp_secret', '_user']]: |
|
|
|
|
|
frappe.cache().expire(field,180) |
|
|
|
|
|
|
|
|
|
|
|
frappe.local.response['verification'] = verification_obj |
|
|
|
|
|
frappe.local.response['tmp_id'] = tmp_id |
|
|
|
|
|
|
|
|
|
|
|
raise frappe.RequestToken |
|
|
|
|
|
else: |
|
|
|
|
|
self.post_login(no_two_auth=True) |
|
|
|
|
|
|
|
|
def login(self): |
|
|
def login(self): |
|
|
# clear cache |
|
|
# clear cache |
|
|
frappe.clear_cache(user = frappe.form_dict.get('usr')) |
|
|
frappe.clear_cache(user = frappe.form_dict.get('usr')) |
|
|
|
|
|
|
|
|
if frappe.db.get_value('System Settings', 'System Settings', 'enable_two_factor_auth') == unicode(0): |
|
|
|
|
|
self.authenticate() |
|
|
|
|
|
self.post_login(no_two_auth=True) |
|
|
|
|
|
|
|
|
otp = frappe.form_dict.get('otp') |
|
|
|
|
|
if otp: |
|
|
|
|
|
try: |
|
|
|
|
|
tmp_info = { |
|
|
|
|
|
'usr': frappe.cache().get(frappe.form_dict.get('tmp_id')+'_usr'), |
|
|
|
|
|
'pwd': frappe.cache().get(frappe.form_dict.get('tmp_id')+'_pwd') |
|
|
|
|
|
} |
|
|
|
|
|
self.authenticate(user=tmp_info['usr'], pwd=tmp_info['pwd']) |
|
|
|
|
|
except: |
|
|
|
|
|
pass |
|
|
|
|
|
self.post_login() |
|
|
else: |
|
|
else: |
|
|
otp = frappe.form_dict.get('otp') |
|
|
|
|
|
if not otp: |
|
|
|
|
|
self.authenticate() |
|
|
|
|
|
# after authenticate, self.user is set (from check_password() call) |
|
|
|
|
|
user_obj = frappe.get_doc('User', self.user) |
|
|
|
|
|
two_factor_auth_user = 0 |
|
|
|
|
|
if user_obj.roles: |
|
|
|
|
|
query = """select name from `tabRole` where two_factor_auth=1 |
|
|
|
|
|
and name in ("All"{0}) limit 1""".format(', '.join('\"{}\"'.format(i.role) for \ |
|
|
|
|
|
i in user_obj.roles)) |
|
|
|
|
|
two_factor_auth_user = len(frappe.db.sql(query)) |
|
|
|
|
|
|
|
|
|
|
|
if two_factor_auth_user >= 1: |
|
|
|
|
|
|
|
|
|
|
|
otp_secret = frappe.db.get_default(self.user + '_otpsecret') |
|
|
|
|
|
verification_method = frappe.db.get_value('System Settings', None, 'two_factor_method') |
|
|
|
|
|
|
|
|
|
|
|
if otp_secret: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
token = int(pyotp.TOTP(otp_secret).now()) |
|
|
|
|
|
|
|
|
|
|
|
if verification_method == 'SMS': |
|
|
|
|
|
user_phone = frappe.db.get_value('User', self.user, ['phone','mobile_no'], as_dict=1) |
|
|
|
|
|
usr_phone = user_phone.mobile_no or user_phone.phone |
|
|
|
|
|
status = self.send_token_via_sms(token=token, phone_no=usr_phone, otpsecret=otp_secret) |
|
|
|
|
|
verification_obj = {'token_delivery': status, |
|
|
|
|
|
'prompt': status and 'Enter verification code sent to {}'.format(usr_phone[:4] + '******' + usr_phone[-3:]), |
|
|
|
|
|
'method': 'SMS'} |
|
|
|
|
|
elif verification_method == 'OTP App': |
|
|
|
|
|
|
|
|
|
|
|
totp_uri = pyotp.TOTP(otp_secret).provisioning_uri(self.user, issuer_name="Estate Manager") |
|
|
|
|
|
|
|
|
|
|
|
if frappe.db.get_default(self.user + '_otplogin'): |
|
|
|
|
|
otp_setup_completed = True |
|
|
|
|
|
else: |
|
|
|
|
|
otp_setup_completed = False |
|
|
|
|
|
|
|
|
|
|
|
verification_obj = {'token_delivery': True, |
|
|
|
|
|
'prompt': False, |
|
|
|
|
|
'totp_uri': totp_uri, |
|
|
|
|
|
'method': 'OTP App', |
|
|
|
|
|
'qrcode': get_qr_svg_code(totp_uri), |
|
|
|
|
|
'otp_setup_completed': otp_setup_completed} |
|
|
|
|
|
elif verification_method == 'Email': |
|
|
|
|
|
status = self.send_token_via_email(token=token,otpsecret=otp_secret) |
|
|
|
|
|
verification_obj = {'token_delivery': status, |
|
|
|
|
|
'prompt': status and 'Enter verification code sent to your registered email address', |
|
|
|
|
|
'method': 'Email'} |
|
|
|
|
|
frappe.local.response['verification'] = verification_obj |
|
|
|
|
|
else: |
|
|
|
|
|
import os |
|
|
|
|
|
import base64 |
|
|
|
|
|
otp_secret = base64.b32encode(os.urandom(10)).decode('utf-8') |
|
|
|
|
|
totp_uri = pyotp.totp.TOTP(otp_secret).provisioning_uri(self.user, issuer_name="Estate Manager") |
|
|
|
|
|
|
|
|
|
|
|
# not actual token but counter ( hotp.at(counter) gives token) |
|
|
|
|
|
token = int(pyotp.TOTP(otp_secret).now()) |
|
|
|
|
|
|
|
|
|
|
|
frappe.local.response['verification'] = { |
|
|
|
|
|
'method_first_time': True, |
|
|
|
|
|
'method': verification_method, |
|
|
|
|
|
'token_delivery': True, |
|
|
|
|
|
'prompt': False, |
|
|
|
|
|
'totp_uri': totp_uri, |
|
|
|
|
|
'qrcode':get_qr_svg_code(totp_uri), |
|
|
|
|
|
'otp_setup_completed': False, |
|
|
|
|
|
#'restrict_method': int(restrict_method) and (fixed_method[0].default_method or 'OTP App') |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
tmp_id = frappe.generate_hash(length=8) |
|
|
|
|
|
usr = frappe.form_dict.get('usr') |
|
|
|
|
|
pwd = frappe.form_dict.get('pwd') |
|
|
|
|
|
|
|
|
|
|
|
if verification_method in ['SMS', 'Email']: |
|
|
|
|
|
frappe.cache().set(tmp_id + '_token',token) |
|
|
|
|
|
frappe.cache().expire(tmp_id + '_token',300) |
|
|
|
|
|
|
|
|
|
|
|
frappe.cache().set(tmp_id + '_usr', usr) |
|
|
|
|
|
frappe.cache().set(tmp_id + '_pwd', pwd) |
|
|
|
|
|
frappe.cache().set(tmp_id + '_otp_secret', otp_secret) |
|
|
|
|
|
frappe.cache().set(tmp_id + '_user', self.user) |
|
|
|
|
|
|
|
|
|
|
|
for field in [tmp_id + nm for nm in ['_usr', '_pwd', '_otp_secret', '_user']]: |
|
|
|
|
|
frappe.cache().expire(field,180) |
|
|
|
|
|
|
|
|
|
|
|
frappe.local.response['tmp_id'] = tmp_id |
|
|
|
|
|
|
|
|
|
|
|
raise frappe.RequestToken |
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
self.post_login(no_two_auth=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.authenticate() |
|
|
|
|
|
if frappe.db.get_value('System Settings', 'System Settings', 'enable_two_factor_auth') == unicode(1): |
|
|
|
|
|
self.process_2fa() |
|
|
else: |
|
|
else: |
|
|
try: |
|
|
|
|
|
tmp_info = { |
|
|
|
|
|
'usr': frappe.cache().get(frappe.form_dict.get('tmp_id')+'_usr'), |
|
|
|
|
|
'pwd': frappe.cache().get(frappe.form_dict.get('tmp_id')+'_pwd') |
|
|
|
|
|
} |
|
|
|
|
|
self.authenticate(user=tmp_info['usr'], pwd=tmp_info['pwd']) |
|
|
|
|
|
except: |
|
|
|
|
|
pass |
|
|
|
|
|
# frappe.log_error(frappe.get_traceback(),"AUTHENTICATION PROBLEM") |
|
|
|
|
|
self.post_login() |
|
|
|
|
|
|
|
|
self.post_login(no_two_auth=True) |
|
|
|
|
|
|
|
|
def post_login(self,no_two_auth=False): |
|
|
def post_login(self,no_two_auth=False): |
|
|
self.run_trigger('on_login') |
|
|
self.run_trigger('on_login') |
|
@@ -242,7 +228,7 @@ class LoginManager: |
|
|
self.make_session() |
|
|
self.make_session() |
|
|
self.set_user_info() |
|
|
self.set_user_info() |
|
|
|
|
|
|
|
|
def confirm_token(self,otp=None, tmp_id=None, hotp_token=False): |
|
|
|
|
|
|
|
|
def confirm_token(self, otp=None, tmp_id=None, hotp_token=False): |
|
|
try: |
|
|
try: |
|
|
otp_secret = frappe.cache().get(tmp_id + '_otp_secret') |
|
|
otp_secret = frappe.cache().get(tmp_id + '_otp_secret') |
|
|
if not otp_secret: |
|
|
if not otp_secret: |
|
@@ -253,9 +239,6 @@ class LoginManager: |
|
|
if hotp_token: |
|
|
if hotp_token: |
|
|
u_hotp = pyotp.HOTP(otp_secret) |
|
|
u_hotp = pyotp.HOTP(otp_secret) |
|
|
if u_hotp.verify(otp, int(hotp_token)): |
|
|
if u_hotp.verify(otp, int(hotp_token)): |
|
|
if not frappe.db.get_default(self.user + '_otpsecret'): |
|
|
|
|
|
frappe.db.set_default(self.user + '_otpsecret', otp_secret) |
|
|
|
|
|
|
|
|
|
|
|
frappe.cache().delete(tmp_id + '_token') |
|
|
frappe.cache().delete(tmp_id + '_token') |
|
|
return True |
|
|
return True |
|
|
else: |
|
|
else: |
|
@@ -264,8 +247,6 @@ class LoginManager: |
|
|
totp = pyotp.TOTP(otp_secret) |
|
|
totp = pyotp.TOTP(otp_secret) |
|
|
if totp.verify(otp): |
|
|
if totp.verify(otp): |
|
|
# show qr code only once |
|
|
# show qr code only once |
|
|
if not frappe.db.get_default(self.user + '_otpsecret'): |
|
|
|
|
|
frappe.db.set_default(self.user + '_otpsecret', otp_secret) |
|
|
|
|
|
if not frappe.db.get_default(self.user + '_otplogin'): |
|
|
if not frappe.db.get_default(self.user + '_otplogin'): |
|
|
frappe.db.set_default(self.user + '_otplogin', 1) |
|
|
frappe.db.set_default(self.user + '_otplogin', 1) |
|
|
return True |
|
|
return True |
|
|