瀏覽代碼

Merge pull request #17300 from resilient-tech/encrypt-otp-secrets

feat: encrypt 2FA secrets
version-14
mergify[bot] 2 年之前
committed by GitHub
父節點
當前提交
6f8be47ff9
沒有發現已知的金鑰在資料庫的簽署中 GPG Key ID: 4AEE18F83AFDEB23
共有 4 個文件被更改,包括 77 次插入13 次删除
  1. +1
    -0
      frappe/patches.txt
  2. +44
    -0
      frappe/patches/v13_0/encrypt_2fa_secrets.py
  3. +2
    -1
      frappe/tests/test_twofactor.py
  4. +30
    -12
      frappe/twofactor.py

+ 1
- 0
frappe/patches.txt 查看文件

@@ -183,6 +183,7 @@ frappe.patches.v13_0.queryreport_columns
frappe.patches.v13_0.jinja_hook
frappe.patches.v13_0.update_notification_channel_if_empty
frappe.patches.v13_0.set_first_day_of_the_week
frappe.patches.v13_0.encrypt_2fa_secrets
execute:frappe.reload_doc('custom', 'doctype', 'custom_field')
frappe.patches.v14_0.update_workspace2 # 20.09.2021
frappe.patches.v14_0.save_ratings_in_fraction #23-12-2021


+ 44
- 0
frappe/patches/v13_0/encrypt_2fa_secrets.py 查看文件

@@ -0,0 +1,44 @@
import frappe
import frappe.defaults
from frappe.cache_manager import clear_defaults_cache
from frappe.twofactor import PARENT_FOR_DEFAULTS
from frappe.utils.password import encrypt

DOCTYPE = "DefaultValue"
OLD_PARENT = "__default"


def execute():
table = frappe.qb.DocType(DOCTYPE)

# set parent for `*_otplogin`
(
frappe.qb.update(table)
.set(table.parent, PARENT_FOR_DEFAULTS)
.where(table.parent == OLD_PARENT)
.where(table.defkey.like("%_otplogin"))
).run()

# update records for `*_otpsecret`
secrets = {
key: value
for key, value in frappe.defaults.get_defaults_for(parent=OLD_PARENT).items()
if key.endswith("_otpsecret")
}

if not secrets:
return

defvalue_cases = frappe.qb.terms.Case()

for key, value in secrets.items():
defvalue_cases.when(table.defkey == key, encrypt(value))

(
frappe.qb.update(table)
.set(table.parent, PARENT_FOR_DEFAULTS)
.set(table.defvalue, defvalue_cases)
.where(table.parent == OLD_PARENT)
).run()

clear_defaults_cache()

+ 2
- 1
frappe/tests/test_twofactor.py 查看文件

@@ -12,6 +12,7 @@ from frappe.twofactor import (
authenticate_for_2factor,
confirm_otp_token,
get_cached_user_pass,
get_default,
get_otpsecret_for_,
get_verification_obj,
should_run_2fa,
@@ -111,7 +112,7 @@ class TestTwoFactor(unittest.TestCase):
def test_get_otpsecret_for_user(self):
"""OTP secret should be set for user."""
self.assertTrue(get_otpsecret_for_(self.user))
self.assertTrue(frappe.db.get_default(self.user + "_otpsecret"))
self.assertTrue(get_default(self.user + "_otpsecret"))

def test_confirm_otp_token(self):
"""Ensure otp is confirmed"""


+ 30
- 12
frappe/twofactor.py 查看文件

@@ -8,9 +8,25 @@ import pyotp
from pyqrcode import create as qrcreate

import frappe
import frappe.defaults
from frappe import _
from frappe.utils import cint, get_datetime, get_url, time_diff_in_seconds
from frappe.utils.background_jobs import enqueue
from frappe.utils.password import decrypt, encrypt

PARENT_FOR_DEFAULTS = "__2fa"


def get_default(key):
return frappe.db.get_default(key, parent=PARENT_FOR_DEFAULTS)


def set_default(key, value):
frappe.db.set_default(key, value, parent=PARENT_FOR_DEFAULTS)


def clear_default(key):
frappe.defaults.clear_default(key, parent=PARENT_FOR_DEFAULTS)


class ExpiredLoginException(Exception):
@@ -118,11 +134,13 @@ def two_factor_is_enabled_for_(user):

def get_otpsecret_for_(user):
"""Set OTP Secret for user even if not set."""
otp_secret = frappe.db.get_default(user + "_otpsecret")
if not otp_secret:
otp_secret = b32encode(os.urandom(10)).decode("utf-8")
frappe.db.set_default(user + "_otpsecret", otp_secret)
frappe.db.commit()
if otp_secret := get_default(user + "_otpsecret"):
return decrypt(otp_secret)

otp_secret = b32encode(os.urandom(10)).decode("utf-8")
set_default(user + "_otpsecret", encrypt(otp_secret))
frappe.db.commit()

return otp_secret


@@ -162,8 +180,8 @@ def confirm_otp_token(login_manager, otp=None, tmp_id=None):
totp = pyotp.TOTP(otp_secret)
if totp.verify(otp):
# show qr code only once
if not frappe.db.get_default(login_manager.user + "_otplogin"):
frappe.db.set_default(login_manager.user + "_otplogin", 1)
if not get_default(login_manager.user + "_otplogin"):
set_default(login_manager.user + "_otplogin", 1)
delete_qrimage(login_manager.user)
tracker.add_success_attempt()
return True
@@ -180,7 +198,7 @@ def get_verification_obj(user, token, otp_secret):
verification_obj = process_2fa_for_sms(user, token, otp_secret)
elif verification_method == "OTP App":
# check if this if the first time that the user is trying to login. If so, send an email
if not frappe.db.get_default(user + "_otplogin"):
if not get_default(user + "_otplogin"):
verification_obj = process_2fa_for_email(user, token, otp_secret, otp_issuer, method="OTP App")
else:
verification_obj = process_2fa_for_otp_app(user, otp_secret, otp_issuer)
@@ -207,7 +225,7 @@ def process_2fa_for_sms(user, token, otp_secret):
def process_2fa_for_otp_app(user, otp_secret, otp_issuer):
"""Process OTP App method for 2fa."""
totp_uri = pyotp.TOTP(otp_secret).provisioning_uri(user, issuer_name=otp_issuer)
if frappe.db.get_default(user + "_otplogin"):
if get_default(user + "_otplogin"):
otp_setup_completed = True
else:
otp_setup_completed = False
@@ -222,7 +240,7 @@ def process_2fa_for_email(user, token, otp_secret, otp_issuer, method="Email"):
message = None
status = True
prompt = ""
if method == "OTP App" and not frappe.db.get_default(user + "_otplogin"):
if method == "OTP App" and not get_default(user + "_otplogin"):
"""Sending one-time email for OTP App"""
totp_uri = pyotp.TOTP(otp_secret).provisioning_uri(user, issuer_name=otp_issuer)
qrcode_link = get_link_for_qrcode(user, totp_uri)
@@ -463,8 +481,8 @@ def reset_otp_secret(user):
otp_issuer = frappe.db.get_single_value("System Settings", "otp_issuer_name")
user_email = frappe.db.get_value("User", user, "email")

frappe.defaults.clear_default(user + "_otplogin")
frappe.defaults.clear_default(user + "_otpsecret")
clear_default(user + "_otplogin")
clear_default(user + "_otpsecret")

email_args = {
"recipients": user_email,


Loading…
取消
儲存