@@ -5,31 +5,27 @@ from __future__ import unicode_literals
import frappe
from frappe import _
import pyotp,base64, os
import pyotp, os
from frappe.utils.background_jobs import enqueue
from jinja2 import Template
from pyqrcode import create as qrcreate
from StringIO import StringIO
from base64 import b64encode,b32encode
from base64 import b64encode, b32encode
from frappe.utils import get_url, get_datetime, time_diff_in_seconds
from frappe.installer import update_site_config
class ExpiredLoginException(Exception): pass
class ExpiredLoginException(Exception):pass
def toggle_two_factor_auth(state,roles=[]):
def toggle_two_factor_auth(state, roles=[]):
'''Enable or disable 2FA in site_config and roles'''
update_site_config('enable_two_factor_auth',state )
frappe.db.set_value('System Settings', None, 'enable_two_factor_auth', 1)
for role in roles:
role = frappe.get_doc('Role',{'role_name':role})
role = frappe.get_doc('Role', {'role_name': role})
role.two_factor_auth = state
role.save(ignore_permissions=True)
def two_factor_is_enabled(user=None):
'''Returns True if 2FA is enabled.'''
enabled = frappe.local.conf.get('enable_two_factor_auth',False )
enabled = frappe.db.get_value('System Settings', None, 'enable_two_factor_auth' )
if not user or not enabled:
return enabled
return two_factor_is_enabled_for_(user)
@@ -38,7 +34,6 @@ def should_run_2fa(user):
'''Check if 2fa should run.'''
return two_factor_is_enabled(user=user)
def get_cached_user_pass():
'''Get user and password if set.'''
user = pwd = None
@@ -46,23 +41,22 @@ def get_cached_user_pass():
if tmp_id:
user = frappe.cache().get(tmp_id+'_usr')
pwd = frappe.cache().get(tmp_id+'_pwd')
return (user,pwd)
return (user, pwd)
def authenticate_for_2factor(user):
'''Authenticate two factor for enabled user before login.'''
if frappe.form_dict.get('otp'):return
if frappe.form_dict.get('otp'):
return
otp_secret = get_otpsecret_for_(user)
verification_method = frappe.db.get_value('System Settings', None, 'two_factor_method')
token = int(pyotp.TOTP(otp_secret).now())
tmp_id = frappe.generate_hash(length=8)
cache_2fa_data(user,token,otp_secret,tmp_id)
verification_obj = get_verification_obj(user,token,otp_secret)
cache_2fa_data(user, token, otp_secret, tmp_id)
verification_obj = get_verification_obj(user, token, otp_secret)
# Save data in local
frappe.local.response['verification'] = verification_obj
frappe.local.response['tmp_id'] = tmp_id
def cache_2fa_data(user,token,otp_secret,tmp_id):
def cache_2fa_data(user, token, otp_secret, tmp_id):
'''Cache and set expiry for data.'''
pwd = frappe.form_dict.get('pwd')
verification_method = get_verification_method()
@@ -74,20 +68,24 @@ def cache_2fa_data(user,token,otp_secret,tmp_id):
frappe.cache().expire(tmp_id + '_token', expiry_time)
else:
expiry_time = 180
for k,v in {'_usr':user,'_pwd':pwd,'_otp_secret':otp_secret}.iteritems():
frappe.cache().set("{0}{1}".format(tmp_id,k),v)
frappe.cache().expire("{0}{1}".format(tmp_id,k),expiry_time)
for k, v in {'_usr': user, '_pwd': pwd, '_otp_secret': otp_secret}.iteritems():
frappe.cache().set("{0}{1}".format(tmp_id, k), v)
frappe.cache().expire("{0}{1}".format(tmp_id, k), expiry_time)
def two_factor_is_enabled_for_(user):
'''Check if 2factor is enabled for user.'''
if isinstance(user,basestring):
user = frappe.get_doc('User',user)
if user.roles:
query = """select name from `tabRole` where two_factor_auth=1
and name in ("All",{0});""".format(', '.join('\"{}\"'.format(i.role) for \
i in user.roles))
if len(frappe.db.sql(query)) > 0:
return True
if isinstance(user, basestring):
user = frappe.get_doc('User', user)
roles = [frappe.db.escape(d.role) for d in user.roles or []]
roles.append('All')
query = """select name from `tabRole` where two_factor_auth=1
and name in ({0}) limit 1""".format(', '.join('\"{}\"'.format(i) for \
i in roles))
if len(frappe.db.sql(query)) > 0:
return True
return False
def get_otpsecret_for_(user):
@@ -102,9 +100,7 @@ def get_otpsecret_for_(user):
def get_verification_method():
return frappe.db.get_value('System Settings', None, 'two_factor_method')
def confirm_otp_token(login_manager,otp=None,tmp_id=None):
def confirm_otp_token(login_manager, otp=None, tmp_id=None):
'''Confirm otp matches.'''
if not otp:
otp = frappe.form_dict.get('otp')
@@ -119,7 +115,7 @@ def confirm_otp_token(login_manager,otp=None,tmp_id=None):
if not otp_secret:
raise ExpiredLoginException(_('Login session expired, refresh page to retry'))
hotp = pyotp.HOTP(otp_secret)
if hotp_token:
if hotp_token:
if hotp.verify(otp, int(hotp_token)):
frappe.cache().delete(tmp_id + '_token')
return True
@@ -137,35 +133,37 @@ def confirm_otp_token(login_manager,otp=None,tmp_id=None):
login_manager.fail(_('Incorrect Verification code'), login_manager.user)
def get_verification_obj(user,token,otp_secret):
def get_verification_obj(user, token, otp_secret):
otp_issuer = frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name')
verification_method = get_verification_method()
verification_obj = None
if verification_method == 'SMS':
verification_obj = process_2fa_for_sms(user,token,otp_secret)
verification_obj = process_2fa_for_sms(user, token, otp_secret)
elif verification_method == 'OTP App':
#check if this if the first time that the user is trying to login. If so, send an email
if not frappe.db.get_default(user + '_otplogin'):
verification_obj = process_2fa_for_email(user,token,otp_secret,otp_issuer,method='OTP App')
verification_obj = process_2fa_for_email(user, token, otp_secret, otp_issuer, method='OTP App')
else:
verification_obj = process_2fa_for_otp_app(user,otp_secret,otp_issuer)
verification_obj = process_2fa_for_otp_app(user, otp_secret, otp_issuer)
elif verification_method == 'Email':
verification_obj = process_2fa_for_email(user,token,otp_secret,otp_issuer)
verification_obj = process_2fa_for_email(user, token, otp_secret, otp_issuer)
return verification_obj
def process_2fa_for_sms(user,token,otp_secret):
def process_2fa_for_sms(user, token, otp_secret):
'''Process sms method for 2fa.'''
phone = frappe.db.get_value('User', user, ['phone','mobile_no'], as_dict=1)
phone = frappe.db.get_value('User', user, ['phone', 'mobile_no'], as_dict=1)
phone = phone.mobile_no or phone.phone
status = send_token_via_sms(otp_secret,token=token, phone_no=phone)
verification_obj = {'token_delivery': status,
'prompt': status and 'Enter verification code sent to {}'.format(phone[:4] + '******' + phone[-3:]),
'method': 'SMS',
'setup': status}
status = send_token_via_sms(otp_secret, token=token, phone_no=phone)
verification_obj = {
'token_delivery': status,
'prompt': status and 'Enter verification code sent to {}'.format(phone[:4] + '******' + phone[-3:]),
'method': 'SMS',
'setup': status
}
return verification_obj
def process_2fa_for_otp_app(user,otp_secret,otp_issuer):
def process_2fa_for_otp_app(user, otp_secret, otp_issuer):
'''Process OTP App method for 2fa.'''
totp_uri = pyotp.TOTP(otp_secret).provisioning_uri(user, issuer_name=otp_issuer)
if frappe.db.get_default(user + '_otplogin'):
@@ -173,13 +171,15 @@ def process_2fa_for_otp_app(user,otp_secret,otp_issuer):
else:
otp_setup_completed = False
verification_obj = {'totp_uri': totp_uri,
'method': 'OTP App',
'qrcode': get_qr_svg_code(totp_uri),
'setup': otp_setup_completed }
verification_obj = {
'totp_uri': totp_uri,
'method': 'OTP App',
'qrcode': get_qr_svg_code(totp_uri),
'setup': otp_setup_completed
}
return verification_obj
def process_2fa_for_email(user,token,otp_secret,otp_issuer,method='Email'):
def process_2fa_for_email(user, token, otp_secret, otp_issuer, method='Email'):
'''Process Email method for 2fa.'''
subject = None
message = None
@@ -188,51 +188,53 @@ def process_2fa_for_email(user,token,otp_secret,otp_issuer,method='Email'):
if method == 'OTP App' and not frappe.db.get_default(user + '_otplogin'):
'''Sending one-time email for OTP App'''
totp_uri = pyotp.TOTP(otp_secret).provisioning_uri(user, issuer_name=otp_issuer)
qrcode_link = get_link_for_qrcode(user,totp_uri)
message = get_email_body_for_qr_code({'qrcode_link':qrcode_link})
subject = get_email_subject_for_qr_code({'qrcode_link':qrcode_link})
prompt = 'Please check your registered email address for instructions on how to proceed. Do not close this window as you will have to return to it!!'
qrcode_link = get_link_for_qrcode(user, totp_uri)
message = get_email_body_for_qr_code({'qrcode_link': qrcode_link})
subject = get_email_subject_for_qr_code({'qrcode_link': qrcode_link})
prompt = _( 'Please check your registered email address for instructions on how to proceed. Do not close this window as you will have to return to it.')
else:
'''Sending email verification'''
prompt = 'Verification code has been sent to your registered email address.'
status = send_token_via_email(user,token,otp_secret,otp_issuer,subject=subject,message=message)
verification_obj = {'token_delivery': status,
'prompt': status and prompt,
'method': 'Email',
'setup': status}
prompt = _('Verification code has been sent to your registered email address.')
status = send_token_via_email(user, token, otp_secret, otp_issuer, subject=subject, message=message)
verification_obj = {
'token_delivery': status,
'prompt': status and prompt,
'method': 'Email',
'setup': status
}
return verification_obj
def get_email_subject_for_2fa(kwargs_dict):
'''Get email subject for 2fa.'''
subject_template = 'Verification Code from {}'.format(frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name'))
subject = render_string_template(subject_template,kwargs_dict)
subject_template = _( 'Login Verification Code from {}') .format(frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name'))
subject = render_string_template(subject_template, kwargs_dict)
return subject
def get_email_body_for_2fa(kwargs_dict):
'''Get email body for 2fa.'''
body_template = 'Use this token to login <br> {{otp}} '
body = render_string_template(body_template,kwargs_dict)
body_template = 'Enter this code to complete your login:<br><br> <b>{{otp}}</b> '
body = render_string_template(body_template, kwargs_dict)
return body
def get_email_subject_for_qr_code(kwargs_dict):
'''Get QRCode email subject.'''
subject_template = 'OTP Registration Code from {}'.format(frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name'))
subject = render_string_template(subject_template,kwargs_dict)
subject_template = _( 'One Time Password (OTP) Registration Code from {}') .format(frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name'))
subject = render_string_template(subject_template, kwargs_dict)
return subject
def get_email_body_for_qr_code(kwargs_dict):
'''Get QRCode email body.'''
body_template = 'Please click on the following link and follow the instructions on the page.<br> {{qrcode_link}}'
body = render_string_template(body_template,kwargs_dict)
body_template = 'Please click on the following link and follow the instructions on the page.<br><br> {{qrcode_link}}'
body = render_string_template(body_template, kwargs_dict)
return body
def render_string_template(_str,kwargs_dict):
def render_string_template(_str, kwargs_dict):
'''Render string with jinja.'''
s = Template(_str)
s = s.render(**kwargs_dict)
return s
def get_link_for_qrcode(user,totp_uri):
def get_link_for_qrcode(user, totp_uri):
'''Get link to temporary page showing QRCode.'''
key = frappe.generate_hash(length=20)
key_user = "{}_user".format(key)
@@ -240,8 +242,8 @@ def get_link_for_qrcode(user,totp_uri):
lifespan = int(frappe.db.get_value('System Settings', 'System Settings', 'lifespan_qrcode_image'))
if lifespan<=0:
lifespan = 240
frappe.cache().set_value(key_uri,totp_uri,expires_in_sec=lifespan)
frappe.cache().set_value(key_user,user,expires_in_sec=lifespan)
frappe.cache().set_value(key_uri, totp_uri, expires_in_sec=lifespan)
frappe.cache().set_value(key_user, user, expires_in_sec=lifespan)
return get_url('/qrcode?k={}'.format(key))
def send_token_via_sms(otpsecret, token=None, phone_no=None):
@@ -258,7 +260,7 @@ def send_token_via_sms(otpsecret, token=None, phone_no=None):
ss = frappe.get_doc('SMS Settings', 'SMS Settings')
if not ss.sms_gateway_url:
return False
hotp = pyotp.HOTP(otpsecret)
args = {ss.message_parameter: 'Your verification code is {}'.format(hotp.at(int(token))), ss.sms_sender_name: otp_issuer}
for d in ss.get("parameters"):
@@ -266,28 +268,35 @@ def send_token_via_sms(otpsecret, token=None, phone_no=None):
args[ss.receiver_parameter] = phone_no
sms_args = {'gateway_url':ss.sms_gateway_url,'params':args}
sms_args = {'gateway_url': ss.sms_gateway_url, 'params': args}
enqueue(method=send_request, queue='short', timeout=300, event=None, async=True, job_name=None, now=False, **sms_args)
return True
def send_token_via_email(user, token, otp_secret, otp_issuer,subject=None,message=None):
def send_token_via_email(user, token, otp_secret, otp_issuer, subject=None, message=None):
'''Send token to user as email.'''
user_email = frappe.db.get_value('User', user, 'email')
if not user_email:
return False
hotp = pyotp.HOTP(otp_secret)
otp = hotp.at(int(token))
template_args = {'otp':otp,'otp_issuer':otp_issuer}
template_args = {'otp': otp, 'otp_issuer': otp_issuer}
if not subject:
subject = get_email_subject_for_2fa(template_args)
if not message:
message = get_email_body_for_2fa(template_args)
email_args = {
'recipients':user_email, 'sender':None, 'subject':subject,
'message':message,
'delayed':False, 'retry':3 }
enqueue(method=frappe.sendmail, queue='short', timeout=300, event=None, async=True, job_name=None, now=False, **email_args)
email_args = {
'recipients': user_email,
'sender': None,
'subject': subject,
'message': message,
'header': [_('Verfication Code'), 'blue'],
'delayed': False,
'retry':3
}
enqueue(method=frappe.sendmail, queue='short',
timeout=300, event=None, async=True, job_name=None, now=False, **email_args)
return True
def get_qr_svg_code(totp_uri):
@@ -297,62 +306,62 @@ def get_qr_svg_code(totp_uri):
stream = StringIO()
try:
url.svg(stream, scale=4, background="#eee", module_color="#222")
svg = stream.getvalue().replace('\n','')
svg = stream.getvalue().replace('\n', '')
svg = b64encode(bytes(svg))
finally:
stream.close()
return svg
def qrcode_as_png(user,totp_uri):
def qrcode_as_png(user, totp_uri):
'''Save temporary Qrcode to server.'''
from frappe.utils.file_manager import save_file
folder = create_barcode_folder()
png_file_name = '{}.png'.format(frappe.generate_hash(length=20))
file_obj = save_file(png_file_name,png_file_name,'User',user,folder=folder)
file_obj = save_file(png_file_name, png_file_name, 'User', user, folder=folder)
frappe.db.commit()
file_url = get_url(file_obj.file_url)
file_path = os.path.join(frappe.get_site_path('public', 'files'),file_obj.file_name)
url = qrcreate(totp_uri)
with open(file_path,'w') as png_file:
url.png(png_file,scale=8, module_color=[0, 0, 0, 180], background=[0xff, 0xff, 0xcc])
file_path = os.path.join(frappe.get_site_path('public', 'files'), file_obj.file_name)
url = qrcreate(totp_uri)
with open(file_path, 'w') as png_file:
url.png(png_file, scale=8, module_color=[0, 0, 0, 180], background=[0xff, 0xff, 0xcc])
return file_url
def create_barcode_folder():
'''Get Barcodes folder.'''
folder_name = 'Barcodes'
folder = frappe.db.exists('File',{'file_name':folder_name})
folder = frappe.db.exists('File', {'file_name': folder_name})
if folder:
return folder
folder = frappe.get_doc({
'doctype':'File',
'file_name':folder_name,
'doctype': 'File',
'file_name': folder_name,
'is_folder':1,
'folder':'Home'
'folder': 'Home'
})
folder.insert(ignore_permissions=True)
return folder.name
def delete_qrimage(user,check_expiry=False):
def delete_qrimage(user, check_expiry=False):
'''Delete Qrimage when user logs in.'''
user_barcodes = frappe.get_all('File',{'attached_to_doctype':'User',
'attached_to_name':user,'folder':'Home/Barcodes'})
user_barcodes = frappe.get_all('File', {'attached_to_doctype': 'User',
'attached_to_name': user, 'folder': 'Home/Barcodes'})
for barcode in user_barcodes:
if check_expiry and not should_remove_barcode_image(barcode):continue
barcode = frappe.get_doc('File',barcode.name)
frappe.delete_doc('File',barcode.name,ignore_permissions=True)
if check_expiry and not should_remove_barcode_image(barcode): continue
barcode = frappe.get_doc('File', barcode.name)
frappe.delete_doc('File', barcode.name, ignore_permissions=True)
def delete_all_barcodes_for_users():
'''Task to delete all barcodes for user.'''
users = frappe.get_all('User',{'enabled':1})
users = frappe.get_all('User', {'enabled':1})
for user in users:
delete_qrimage(user.name,check_expiry=True)
delete_qrimage(user.name, check_expiry=True)
def should_remove_barcode_image(barcode):
'''Check if it's time to delete barcode image from server. '''
if isinstance(barcode, basestring):
barcode = frappe.get_doc('File',barcode)
barcode = frappe.get_doc('File', barcode)
lifespan = frappe.db.get_value('System Settings', 'System Settings', 'lifespan_qrcode_image')
if time_diff_in_seconds(get_datetime(),barcode.creation) > int(lifespan):
if time_diff_in_seconds(get_datetime(), barcode.creation) > int(lifespan):
return True
return False