diff --git a/frappe/auth.py b/frappe/auth.py index f6823d7b2d..8e5c7bc875 100644 --- a/frappe/auth.py +++ b/frappe/auth.py @@ -17,6 +17,7 @@ from frappe.translate import get_lang_code from frappe.utils.password import check_password from frappe.core.doctype.authentication_log.authentication_log import add_authentication_log from frappe.utils.background_jobs import enqueue +from twofactor import validate_2fa_if_set, confirm_otp_token from urllib import quote @@ -102,7 +103,7 @@ class LoginManager: self.user_type = None if frappe.local.form_dict.get('cmd')=='login' or frappe.local.request.path=="/api/method/login": - self.login() + if not self.login():return self.resume = False # run login triggers @@ -193,44 +194,28 @@ class LoginManager: frappe.local.response['verification'] = verification_obj frappe.local.response['tmp_id'] = tmp_id - raise frappe.RequestToken + # raise frappe.RequestToken else: self.post_login(no_two_auth=True) def login(self): # clear cache frappe.clear_cache(user = frappe.form_dict.get('usr')) + self.authenticate() + otp = validate_2fa_if_set(self.user) + return self.post_login() - otp = frappe.form_dict.get('otp') - if otp: - try: - tmp_info = { - 'usr': frappe.cache().get(frappe.form_dict.get('tmp_id')+'_usr'), - 'pwd': frappe.cache().get(frappe.form_dict.get('tmp_id')+'_pwd') - } - self.authenticate(user=tmp_info['usr'], pwd=tmp_info['pwd']) - except: - pass - self.post_login() - else: - self.authenticate() - if (self.user != 'Administrator') and (frappe.db.get_value('System Settings', 'System Settings', 'enable_two_factor_auth') == unicode(1)): - self.process_2fa() - else: - self.post_login(no_two_auth=True) - def post_login(self,no_two_auth=False): + def post_login(self,otp=None): self.run_trigger('on_login') self.validate_ip_address() self.validate_hour() - if frappe.form_dict.get('otp') and not no_two_auth: - hotp_token = frappe.cache().get(frappe.form_dict.get('tmp_id') + '_token') - self.confirm_token(otp=frappe.form_dict.get('otp'), tmp_id=frappe.form_dict.get('tmp_id'), hotp_token=hotp_token) - self.make_session() - self.set_user_info() - else: - self.make_session() - self.set_user_info() + if not confirm_otp_token(self,otp): + return False + self.make_session() + self.set_user_info() + return True + def confirm_token(self, otp=None, tmp_id=None, hotp_token=False): try: diff --git a/frappe/tests/test_twofactor.py b/frappe/tests/test_twofactor.py new file mode 100644 index 0000000000..ef86b016d7 --- /dev/null +++ b/frappe/tests/test_twofactor.py @@ -0,0 +1,92 @@ +# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors +# MIT License. See license.txt +from __future__ import unicode_literals + +import unittest, frappe +from werkzeug.wrappers import Request +from werkzeug.test import EnvironBuilder +from frappe.auth import LoginManager, HTTPRequest +from frappe.website import render + + +class TestTwoFactor(unittest.TestCase): + + + def setUp(self): + self.http_requests = create_http_request() + self.login_manager = frappe.local.login_manager + self.user = self.login_manager.user + print self.user + + def test_debug(self): + pass + + # def test_two_factor_auth_user(self): + # '''Test OTP secret and verification method is initiated.''' + # two_factor_role = self.login_manager.two_factor_auth_user() + # otp_secret = frappe.db.get_default('test@erpnext.com_otpsecret') + # self.assertFalse(two_factor_role) + # toggle_2fa_all_role(True) + # two_factor_role = self.login_manager.two_factor_auth_user() + # self.assertTrue(two_factor_role) + # self.assertNotEqual(otp_secret,None) + # self.assertEqual(self.login_manager.verification_method,'OTP App') + # frappe.db.set_default('test@erpnext.com_otpsecret', None) + # toggle_2fa_all_role(False) + + # def test_get_verification_obj(self): + # '''Auth url should be present in verification object.''' + # verification_obj = self.login_manager.get_verification_obj() + # self.assertIn('otpauth://',verification_obj['totp_uri']) + # self.assertTrue(len(verification_obj['qrcode']) > 1 ) + + # def test_process_2fa(self): + # self.login_manager.process_2fa() + # toggle_2fa_all_role(True) + # print self.login_manager.info + # # print frappe.local.response['verification'] + # # self.assertTrue(False not in [i in frappe.local.response['verification'] \ + # # for i in ['totp_uri','method','qrcode','setup']]) + # toggle_2fa_all_role(False) + + # def test_confirm_token(self): + # pass + + # def test_send_token_via_sms(self): + # pass + + # def test_send_token_via_email(self): + # pass + + + +def set_request(**kwargs): + builder = EnvironBuilder(**kwargs) + frappe.local.request = Request(builder.get_environ()) + +def create_http_request(): + '''Get http request object.''' + set_request(method='POST', path='login') + enable_2fa() + frappe.form_dict['usr'] = 'test@erpnext.com' + frappe.form_dict['pwd'] = 'test' + frappe.local.form_dict['cmd'] = 'login' + http_requests = HTTPRequest() + return http_requests + +def enable_2fa(): + '''Enable Two factor in system settings.''' + system_settings = frappe.get_doc('System Settings') + system_settings.enable_two_factor_auth = True + system_settings.two_factor_method = 'OTP App' + system_settings.save(ignore_permissions=True) + frappe.db.commit() + +def toggle_2fa_all_role(state=None): + all_role = frappe.get_doc('Role','All') + if state == None: + state = False if all_role.two_factor_auth == True else False + if state not in [True,False]:return + all_role.two_factor_auth = state + all_role.save(ignore_permissions=True) + frappe.db.commit() diff --git a/frappe/twofactor.py b/frappe/twofactor.py new file mode 100644 index 0000000000..b05f29428b --- /dev/null +++ b/frappe/twofactor.py @@ -0,0 +1,214 @@ +# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors +# MIT License. See license.txt + +from __future__ import unicode_literals + +import frappe +import pyotp,base64,os +from frappe.utils.background_jobs import enqueue +from pyqrcode import create as qrcreate +from StringIO import StringIO +from base64 import b64encode,b32encode + + + +def validate_2fa_if_set(user): + '''Check for 2fa if set in settings.''' + site_otp_enabled = frappe.db.get_value('System Settings', 'System Settings', 'enable_two_factor_auth') + user_otp_enabled = two_factor_is_enabled_for_(user) + #Don't validate for Admin of if not enabled + if user =='Administrator' or not site_otp_enabled or not user_otp_enabled: + return (None,None,None) + otp = frappe.form_dict.get('otp') + if otp: + user = frappe.cache().get(frappe.form_dict.get('tmp_id')+'_usr') + pwd = frappe.cache().get(frappe.form_dict.get('tmp_id')+'_pwd') + return (user,pwd,otp) + authenticate_for_2factor(user) + + +def authenticate_for_2factor(user): + '''Authenticate two factor for enabled user before login.''' + otp_secret = get_otpsecret_for_(user) + verification_method = frappe.db.get_value('System Settings', None, 'two_factor_method') + token = int(pyotp.TOTP(otp_secret).now()) + tmp_id = frappe.generate_hash(length=8) + cache_2fa_data(user,token,otp_secret,tmp_id) + verification_obj = get_verification_obj(user,token,otp_secret) + # Save data in local + frappe.local.response['verification'] = verification_obj + frappe.local.response['tmp_id'] = tmp_id + +def cache_2fa_data(user,token,otp_secret,tmp_id): + '''Cache and set expiry for data.''' + pwd = frappe.form_dict.get('pwd') + verification_method = get_verification_method() + + # set increased expiry time for SMS and Email + if verification_method in ['SMS', 'Email']: + expiry_time = 300 + frappe.cache().set(tmp_id + '_token', token) + frappe.cache().expire(tmp_id + '_token', expiry_time) + else: + expiry_time = 180 + for k,v in {'_usr':user,'_pwd':pwd,'_otp_secret':otp_secret}.iteritems(): + frappe.cache().set("{0}{1}".format(tmp_id,k),v) + frappe.cache().expire("{0}{1}".format(tmp_id,k),expiry_time) + +def two_factor_is_enabled_for_(user): + '''Check if 2factor is enabled for user.''' + if isinstance(user,basestring): + user = frappe.get_doc('User',user) + if user.roles: + query = """select name from `tabRole` where two_factor_auth=1 + and name in ("All",{0});""".format(', '.join('\"{}\"'.format(i.role) for \ + i in user.roles)) + if len(frappe.db.sql(query)) > 0: + return True + return False + +def get_otpsecret_for_(user): + '''Set OTP Secret for user even if not set.''' + otp_secret = frappe.db.get_default(user + '_otpsecret') + if not otp_secret: + otp_secret = b32encode(os.urandom(10)).decode('utf-8') + frappe.db.set_default(user + '_otpsecret', otp_secret) + frappe.db.commit() + return otp_secret + +def get_verification_method(): + return frappe.db.get_value('System Settings', None, 'two_factor_method') + + + +def confirm_otp_token(login_manager, otp): + '''Confirm otp matches.''' + if not otp: + if two_factor_is_enabled_for_(login_manager.user): + return False + return True + hotp_token = frappe.cache().get(frappe.form_dict.get('tmp_id') + '_token') + tmp_id = frappe.form_dict.get('tmp_id') + otp_secret = frappe.cache().get(tmp_id + '_otp_secret') + if not otp_secret: + frappe.throw('Login session expired. Refresh page to try again') + hotp = pyotp.HOTP(otp_secret) + if hotp_token: + if hotp.verify(otp, int(hotp_token)): + frappe.cache().delete(tmp_id + '_token') + return + else: + login_manager.fail('Incorrect Verification code', login_manager.user) + + totp = pyotp.TOTP(otp_secret) + if totp.verify(otp): + # show qr code only once + if not frappe.db.get_default(login_manager.user + '_otplogin'): + frappe.db.set_default(login_manager.user + '_otplogin', 1) + return True + else: + login_manager.fail('Incorrect Verification code', user) + + +def get_verification_obj(user,token,otp_secret): + otp_issuer = frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name') + verification_method = get_verification_method() + verification_obj = None + if verification_method == 'SMS': + verification_obj = process_2fa_for_sms(user,token,otp_secret) + elif verification_method == 'OTP App': + verification_obj = process_2fa_for_otp_app(user,otp_secret,otp_issuer) + elif verification_method == 'Email': + process_2fa_for_email(user,token,otp_secret,otp_issuer) + return verification_obj + + +def process_2fa_for_sms(user,token,otp_secret): + '''Process sms method for 2fa.''' + phone = frappe.db.get_value('User', user, ['phone','mobile_no'], as_dict=1) + phone = phone.mobile_no or phone.phone + status = send_token_via_sms(otp_secret,token=token, phone_no=phone) + verification_obj = {'token_delivery': status, + 'prompt': status and 'Enter verification code sent to {}'.format(phone[:4] + '******' + phone[-3:]), + 'method': 'SMS'} + return verification_obj + +def process_2fa_for_otp_app(user,otp_secret,otp_issuer): + '''Process OTP App method for 2fa.''' + totp_uri = pyotp.TOTP(otp_secret).provisioning_uri(user, issuer_name=otp_issuer) + if frappe.db.get_default(user + '_otplogin'): + otp_setup_completed = True + else: + otp_setup_completed = False + + verification_obj = {'totp_uri': totp_uri, + 'method': 'OTP App', + 'qrcode': get_qr_svg_code(totp_uri), + 'setup': otp_setup_completed } + return verification_obj + +def process_2fa_for_email(user,token,otp_secret,otp_issuer): + '''Process Email method for 2fa.''' + status = send_token_via_email(user,token,otp_secret,otp_issuer) + verification_obj = {'token_delivery': status, + 'prompt': status and 'Enter verification code sent to your registered email address', + 'method': 'Email'} + return verification_obj + + + +def send_token_via_sms(self, otpsecret, token=None, phone_no=None): + '''Send token as sms to user.''' + otp_issuer = frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name') + try: + from frappe.core.doctype.sms_settings.sms_settings import send_request + except: + return False + + if not phone_no: + return False + + ss = frappe.get_doc('SMS Settings', 'SMS Settings') + if not ss.sms_gateway_url: + return False + + hotp = pyotp.HOTP(otpsecret) + args = {ss.message_parameter: 'Your verification code is {}'.format(hotp.at(int(token))), ss.sms_sender_name: otp_issuer} + for d in ss.get("parameters"): + args[d.parameter] = d.value + + args[ss.receiver_parameter] = phone_no + + sms_args = {'gateway_url':ss.sms_gateway_url,'params':args} + enqueue(method=send_request, queue='short', timeout=300, event=None, async=True, job_name=None, now=False, **sms_args) + return True + +def send_token_via_email(user, token, otp_secret, otp_issuer): + '''Send token to user as email.''' + user_email = frappe.db.get_value('User', user, 'email') + if not user_email: + return False + hotp = pyotp.HOTP(otp_secret) + email_args = { + 'recipients':user_email, 'sender':None, 'subject':'Verification Code from {}'.format(otp_issuer or "Frappe Framework"), + 'message':'

Your verification code is {}.

'.format(hotp.at(int(token))), + 'delayed':False, 'retry':3 } + + enqueue(method=frappe.sendmail, queue='short', timeout=300, event=None, async=True, job_name=None, now=False, **email_args) + return True + + + +def get_qr_svg_code(totp_uri): + '''Get SVG code to display Qrcode for OTP.''' + url = qrcreate(totp_uri) + stream = StringIO() + url.svg(stream, scale=3) + svg = stream.getvalue().replace('\n','') + svg = b64encode(bytes(svg)) + return svg + + + + +