Procházet zdrojové kódy

[WIP][Refactor] Redo twofactor code

version-14
B H Boma před 8 roky
rodič
revize
a8b526bfd1
3 změnil soubory, kde provedl 319 přidání a 28 odebrání
  1. +13
    -28
      frappe/auth.py
  2. +92
    -0
      frappe/tests/test_twofactor.py
  3. +214
    -0
      frappe/twofactor.py

+ 13
- 28
frappe/auth.py Zobrazit soubor

@@ -17,6 +17,7 @@ from frappe.translate import get_lang_code
from frappe.utils.password import check_password
from frappe.core.doctype.authentication_log.authentication_log import add_authentication_log
from frappe.utils.background_jobs import enqueue
from twofactor import validate_2fa_if_set, confirm_otp_token


from urllib import quote
@@ -102,7 +103,7 @@ class LoginManager:
self.user_type = None

if frappe.local.form_dict.get('cmd')=='login' or frappe.local.request.path=="/api/method/login":
self.login()
if not self.login():return
self.resume = False

# run login triggers
@@ -193,44 +194,28 @@ class LoginManager:
frappe.local.response['verification'] = verification_obj
frappe.local.response['tmp_id'] = tmp_id

raise frappe.RequestToken
# raise frappe.RequestToken
else:
self.post_login(no_two_auth=True)

def login(self):
# clear cache
frappe.clear_cache(user = frappe.form_dict.get('usr'))
self.authenticate()
otp = validate_2fa_if_set(self.user)
return self.post_login()

otp = frappe.form_dict.get('otp')
if otp:
try:
tmp_info = {
'usr': frappe.cache().get(frappe.form_dict.get('tmp_id')+'_usr'),
'pwd': frappe.cache().get(frappe.form_dict.get('tmp_id')+'_pwd')
}
self.authenticate(user=tmp_info['usr'], pwd=tmp_info['pwd'])
except:
pass
self.post_login()
else:
self.authenticate()
if (self.user != 'Administrator') and (frappe.db.get_value('System Settings', 'System Settings', 'enable_two_factor_auth') == unicode(1)):
self.process_2fa()
else:
self.post_login(no_two_auth=True)

def post_login(self,no_two_auth=False):
def post_login(self,otp=None):
self.run_trigger('on_login')
self.validate_ip_address()
self.validate_hour()
if frappe.form_dict.get('otp') and not no_two_auth:
hotp_token = frappe.cache().get(frappe.form_dict.get('tmp_id') + '_token')
self.confirm_token(otp=frappe.form_dict.get('otp'), tmp_id=frappe.form_dict.get('tmp_id'), hotp_token=hotp_token)
self.make_session()
self.set_user_info()
else:
self.make_session()
self.set_user_info()
if not confirm_otp_token(self,otp):
return False
self.make_session()
self.set_user_info()
return True


def confirm_token(self, otp=None, tmp_id=None, hotp_token=False):
try:


+ 92
- 0
frappe/tests/test_twofactor.py Zobrazit soubor

@@ -0,0 +1,92 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt
from __future__ import unicode_literals

import unittest, frappe
from werkzeug.wrappers import Request
from werkzeug.test import EnvironBuilder
from frappe.auth import LoginManager, HTTPRequest
from frappe.website import render


class TestTwoFactor(unittest.TestCase):


def setUp(self):
self.http_requests = create_http_request()
self.login_manager = frappe.local.login_manager
self.user = self.login_manager.user
print self.user

def test_debug(self):
pass

# def test_two_factor_auth_user(self):
# '''Test OTP secret and verification method is initiated.'''
# two_factor_role = self.login_manager.two_factor_auth_user()
# otp_secret = frappe.db.get_default('test@erpnext.com_otpsecret')
# self.assertFalse(two_factor_role)
# toggle_2fa_all_role(True)
# two_factor_role = self.login_manager.two_factor_auth_user()
# self.assertTrue(two_factor_role)
# self.assertNotEqual(otp_secret,None)
# self.assertEqual(self.login_manager.verification_method,'OTP App')
# frappe.db.set_default('test@erpnext.com_otpsecret', None)
# toggle_2fa_all_role(False)

# def test_get_verification_obj(self):
# '''Auth url should be present in verification object.'''
# verification_obj = self.login_manager.get_verification_obj()
# self.assertIn('otpauth://',verification_obj['totp_uri'])
# self.assertTrue(len(verification_obj['qrcode']) > 1 )

# def test_process_2fa(self):
# self.login_manager.process_2fa()
# toggle_2fa_all_role(True)
# print self.login_manager.info
# # print frappe.local.response['verification']
# # self.assertTrue(False not in [i in frappe.local.response['verification'] \
# # for i in ['totp_uri','method','qrcode','setup']])
# toggle_2fa_all_role(False)

# def test_confirm_token(self):
# pass

# def test_send_token_via_sms(self):
# pass

# def test_send_token_via_email(self):
# pass



def set_request(**kwargs):
builder = EnvironBuilder(**kwargs)
frappe.local.request = Request(builder.get_environ())

def create_http_request():
'''Get http request object.'''
set_request(method='POST', path='login')
enable_2fa()
frappe.form_dict['usr'] = 'test@erpnext.com'
frappe.form_dict['pwd'] = 'test'
frappe.local.form_dict['cmd'] = 'login'
http_requests = HTTPRequest()
return http_requests

def enable_2fa():
'''Enable Two factor in system settings.'''
system_settings = frappe.get_doc('System Settings')
system_settings.enable_two_factor_auth = True
system_settings.two_factor_method = 'OTP App'
system_settings.save(ignore_permissions=True)
frappe.db.commit()

def toggle_2fa_all_role(state=None):
all_role = frappe.get_doc('Role','All')
if state == None:
state = False if all_role.two_factor_auth == True else False
if state not in [True,False]:return
all_role.two_factor_auth = state
all_role.save(ignore_permissions=True)
frappe.db.commit()

+ 214
- 0
frappe/twofactor.py Zobrazit soubor

@@ -0,0 +1,214 @@
# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
# MIT License. See license.txt

from __future__ import unicode_literals

import frappe
import pyotp,base64,os
from frappe.utils.background_jobs import enqueue
from pyqrcode import create as qrcreate
from StringIO import StringIO
from base64 import b64encode,b32encode



def validate_2fa_if_set(user):
'''Check for 2fa if set in settings.'''
site_otp_enabled = frappe.db.get_value('System Settings', 'System Settings', 'enable_two_factor_auth')
user_otp_enabled = two_factor_is_enabled_for_(user)
#Don't validate for Admin of if not enabled
if user =='Administrator' or not site_otp_enabled or not user_otp_enabled:
return (None,None,None)
otp = frappe.form_dict.get('otp')
if otp:
user = frappe.cache().get(frappe.form_dict.get('tmp_id')+'_usr')
pwd = frappe.cache().get(frappe.form_dict.get('tmp_id')+'_pwd')
return (user,pwd,otp)
authenticate_for_2factor(user)


def authenticate_for_2factor(user):
'''Authenticate two factor for enabled user before login.'''
otp_secret = get_otpsecret_for_(user)
verification_method = frappe.db.get_value('System Settings', None, 'two_factor_method')
token = int(pyotp.TOTP(otp_secret).now())
tmp_id = frappe.generate_hash(length=8)
cache_2fa_data(user,token,otp_secret,tmp_id)
verification_obj = get_verification_obj(user,token,otp_secret)
# Save data in local
frappe.local.response['verification'] = verification_obj
frappe.local.response['tmp_id'] = tmp_id

def cache_2fa_data(user,token,otp_secret,tmp_id):
'''Cache and set expiry for data.'''
pwd = frappe.form_dict.get('pwd')
verification_method = get_verification_method()

# set increased expiry time for SMS and Email
if verification_method in ['SMS', 'Email']:
expiry_time = 300
frappe.cache().set(tmp_id + '_token', token)
frappe.cache().expire(tmp_id + '_token', expiry_time)
else:
expiry_time = 180
for k,v in {'_usr':user,'_pwd':pwd,'_otp_secret':otp_secret}.iteritems():
frappe.cache().set("{0}{1}".format(tmp_id,k),v)
frappe.cache().expire("{0}{1}".format(tmp_id,k),expiry_time)

def two_factor_is_enabled_for_(user):
'''Check if 2factor is enabled for user.'''
if isinstance(user,basestring):
user = frappe.get_doc('User',user)
if user.roles:
query = """select name from `tabRole` where two_factor_auth=1
and name in ("All",{0});""".format(', '.join('\"{}\"'.format(i.role) for \
i in user.roles))
if len(frappe.db.sql(query)) > 0:
return True
return False

def get_otpsecret_for_(user):
'''Set OTP Secret for user even if not set.'''
otp_secret = frappe.db.get_default(user + '_otpsecret')
if not otp_secret:
otp_secret = b32encode(os.urandom(10)).decode('utf-8')
frappe.db.set_default(user + '_otpsecret', otp_secret)
frappe.db.commit()
return otp_secret

def get_verification_method():
return frappe.db.get_value('System Settings', None, 'two_factor_method')



def confirm_otp_token(login_manager, otp):
'''Confirm otp matches.'''
if not otp:
if two_factor_is_enabled_for_(login_manager.user):
return False
return True
hotp_token = frappe.cache().get(frappe.form_dict.get('tmp_id') + '_token')
tmp_id = frappe.form_dict.get('tmp_id')
otp_secret = frappe.cache().get(tmp_id + '_otp_secret')
if not otp_secret:
frappe.throw('Login session expired. Refresh page to try again')
hotp = pyotp.HOTP(otp_secret)
if hotp_token:
if hotp.verify(otp, int(hotp_token)):
frappe.cache().delete(tmp_id + '_token')
return
else:
login_manager.fail('Incorrect Verification code', login_manager.user)

totp = pyotp.TOTP(otp_secret)
if totp.verify(otp):
# show qr code only once
if not frappe.db.get_default(login_manager.user + '_otplogin'):
frappe.db.set_default(login_manager.user + '_otplogin', 1)
return True
else:
login_manager.fail('Incorrect Verification code', user)


def get_verification_obj(user,token,otp_secret):
otp_issuer = frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name')
verification_method = get_verification_method()
verification_obj = None
if verification_method == 'SMS':
verification_obj = process_2fa_for_sms(user,token,otp_secret)
elif verification_method == 'OTP App':
verification_obj = process_2fa_for_otp_app(user,otp_secret,otp_issuer)
elif verification_method == 'Email':
process_2fa_for_email(user,token,otp_secret,otp_issuer)
return verification_obj


def process_2fa_for_sms(user,token,otp_secret):
'''Process sms method for 2fa.'''
phone = frappe.db.get_value('User', user, ['phone','mobile_no'], as_dict=1)
phone = phone.mobile_no or phone.phone
status = send_token_via_sms(otp_secret,token=token, phone_no=phone)
verification_obj = {'token_delivery': status,
'prompt': status and 'Enter verification code sent to {}'.format(phone[:4] + '******' + phone[-3:]),
'method': 'SMS'}
return verification_obj

def process_2fa_for_otp_app(user,otp_secret,otp_issuer):
'''Process OTP App method for 2fa.'''
totp_uri = pyotp.TOTP(otp_secret).provisioning_uri(user, issuer_name=otp_issuer)
if frappe.db.get_default(user + '_otplogin'):
otp_setup_completed = True
else:
otp_setup_completed = False

verification_obj = {'totp_uri': totp_uri,
'method': 'OTP App',
'qrcode': get_qr_svg_code(totp_uri),
'setup': otp_setup_completed }
return verification_obj

def process_2fa_for_email(user,token,otp_secret,otp_issuer):
'''Process Email method for 2fa.'''
status = send_token_via_email(user,token,otp_secret,otp_issuer)
verification_obj = {'token_delivery': status,
'prompt': status and 'Enter verification code sent to your registered email address',
'method': 'Email'}
return verification_obj



def send_token_via_sms(self, otpsecret, token=None, phone_no=None):
'''Send token as sms to user.'''
otp_issuer = frappe.db.get_value('System Settings', 'System Settings', 'otp_issuer_name')
try:
from frappe.core.doctype.sms_settings.sms_settings import send_request
except:
return False

if not phone_no:
return False

ss = frappe.get_doc('SMS Settings', 'SMS Settings')
if not ss.sms_gateway_url:
return False
hotp = pyotp.HOTP(otpsecret)
args = {ss.message_parameter: 'Your verification code is {}'.format(hotp.at(int(token))), ss.sms_sender_name: otp_issuer}
for d in ss.get("parameters"):
args[d.parameter] = d.value

args[ss.receiver_parameter] = phone_no

sms_args = {'gateway_url':ss.sms_gateway_url,'params':args}
enqueue(method=send_request, queue='short', timeout=300, event=None, async=True, job_name=None, now=False, **sms_args)
return True

def send_token_via_email(user, token, otp_secret, otp_issuer):
'''Send token to user as email.'''
user_email = frappe.db.get_value('User', user, 'email')
if not user_email:
return False
hotp = pyotp.HOTP(otp_secret)
email_args = {
'recipients':user_email, 'sender':None, 'subject':'Verification Code from {}'.format(otp_issuer or "Frappe Framework"),
'message':'<p>Your verification code is {}.</p>'.format(hotp.at(int(token))),
'delayed':False, 'retry':3 }

enqueue(method=frappe.sendmail, queue='short', timeout=300, event=None, async=True, job_name=None, now=False, **email_args)
return True



def get_qr_svg_code(totp_uri):
'''Get SVG code to display Qrcode for OTP.'''
url = qrcreate(totp_uri)
stream = StringIO()
url.svg(stream, scale=3)
svg = stream.getvalue().replace('\n','')
svg = b64encode(bytes(svg))
return svg






Načítá se…
Zrušit
Uložit