|
|
@@ -10,7 +10,47 @@ from urllib.parse import urljoin |
|
|
|
import frappe |
|
|
|
from frappe.integrations.doctype.social_login_key.test_social_login_key import create_or_update_social_login_key |
|
|
|
|
|
|
|
test_dependencies = ['Connected App', 'OAuth Client', 'User'] |
|
|
|
|
|
|
|
def get_user(usr, pwd): |
|
|
|
user = frappe.new_doc('User') |
|
|
|
user.email = usr |
|
|
|
user.enabled = 1 |
|
|
|
user.first_name = "_Test" |
|
|
|
user.new_password = pwd |
|
|
|
user.roles = [] |
|
|
|
user.append('roles', { |
|
|
|
'doctype': 'Has Role', |
|
|
|
'parentfield': 'roles', |
|
|
|
'role': 'System Manager' |
|
|
|
}) |
|
|
|
user.insert() |
|
|
|
|
|
|
|
return user |
|
|
|
|
|
|
|
|
|
|
|
def get_connected_app(): |
|
|
|
doctype = 'Connected App' |
|
|
|
connected_app = frappe.new_doc(doctype) |
|
|
|
connected_app.provider_name = 'frappe' |
|
|
|
connected_app.scopes = [] |
|
|
|
connected_app.append('scopes', {'scope': 'all'}) |
|
|
|
connected_app.insert() |
|
|
|
|
|
|
|
return connected_app |
|
|
|
|
|
|
|
|
|
|
|
def get_oauth_client(): |
|
|
|
oauth_client = frappe.new_doc('OAuth Client') |
|
|
|
oauth_client.app_name = '_Test Connected App' |
|
|
|
oauth_client.redirect_uris = 'to be replaced' |
|
|
|
oauth_client.default_redirect_uri = 'to be replaced' |
|
|
|
oauth_client.grant_type = 'Authorization Code' |
|
|
|
oauth_client.response_type = 'Code' |
|
|
|
oauth_client.skip_authorization = 1 |
|
|
|
oauth_client.insert() |
|
|
|
|
|
|
|
return oauth_client |
|
|
|
|
|
|
|
|
|
|
|
class TestConnectedApp(unittest.TestCase): |
|
|
|
|
|
|
@@ -26,37 +66,56 @@ class TestConnectedApp(unittest.TestCase): |
|
|
|
just endpoints) are stored in "Social Login Key" so we get them from |
|
|
|
there. |
|
|
|
""" |
|
|
|
self.user_name = 'test@example.com' |
|
|
|
self.user_name = 'test-connected-app@example.com' |
|
|
|
self.user_password = 'Eastern_43A1W' |
|
|
|
|
|
|
|
connected_app = frappe.get_last_doc('Connected App') |
|
|
|
redirect_uri = connected_app.get('redirect_uri') |
|
|
|
self.user = get_user(self.user_name, self.user_password) |
|
|
|
self.connected_app = get_connected_app() |
|
|
|
self.oauth_client = get_oauth_client() |
|
|
|
social_login_key = create_or_update_social_login_key() |
|
|
|
self.base_url = social_login_key.get('base_url') |
|
|
|
|
|
|
|
frappe.db.commit() |
|
|
|
self.connected_app.reload() |
|
|
|
self.oauth_client.reload() |
|
|
|
|
|
|
|
web_application_client = frappe.get_last_doc('OAuth Client') |
|
|
|
web_application_client.update({ |
|
|
|
redirect_uri = self.connected_app.get('redirect_uri') |
|
|
|
self.oauth_client.update({ |
|
|
|
'redirect_uris': redirect_uri, |
|
|
|
'default_redirect_uri': redirect_uri |
|
|
|
}) |
|
|
|
web_application_client.save() |
|
|
|
self.oauth_client.save() |
|
|
|
|
|
|
|
social_login_key = create_or_update_social_login_key() |
|
|
|
self.base_url = social_login_key.get('base_url') |
|
|
|
|
|
|
|
connected_app.authorization_uri = urljoin(self.base_url, social_login_key.get('authorize_url')) |
|
|
|
connected_app.token_uri = urljoin(self.base_url, social_login_key.get('access_token_url')) |
|
|
|
connected_app.client_id = web_application_client.get('client_id') |
|
|
|
connected_app.client_secret = web_application_client.get('client_secret') |
|
|
|
self.connected_app = connected_app.save() |
|
|
|
self.connected_app.update({ |
|
|
|
'authorization_uri': urljoin(self.base_url, social_login_key.get('authorize_url')), |
|
|
|
'client_id': self.oauth_client.get('client_id'), |
|
|
|
'client_secret': self.oauth_client.get('client_secret'), |
|
|
|
'token_uri': urljoin(self.base_url, social_login_key.get('access_token_url')) |
|
|
|
}) |
|
|
|
self.connected_app.save() |
|
|
|
|
|
|
|
frappe.db.commit() |
|
|
|
self.connected_app.reload() |
|
|
|
self.oauth_client.reload() |
|
|
|
|
|
|
|
def test_web_application_flow(self): |
|
|
|
"""Simulate a logged in user who opens the authorization URL.""" |
|
|
|
def login(): |
|
|
|
return session.get(urljoin(self.base_url, '/api/method/login'), params={ |
|
|
|
'usr': self.user_name, |
|
|
|
'pwd': self.user_password |
|
|
|
}) |
|
|
|
|
|
|
|
session = requests.Session() |
|
|
|
session.post(urljoin(self.base_url, '/api/method/login'), data={ |
|
|
|
'usr': self.user_name, |
|
|
|
'pwd': self.user_password |
|
|
|
}) |
|
|
|
|
|
|
|
# first login of a new user on a new site fails with "401 UNAUTHORIZED" |
|
|
|
# when anybody fixes that, the two lines below can be removed |
|
|
|
first_login = login() |
|
|
|
self.assertEqual(first_login.status_code, 401) |
|
|
|
|
|
|
|
second_login = login() |
|
|
|
self.assertEqual(second_login.status_code, 200) |
|
|
|
|
|
|
|
authorization_url = self.connected_app.initiate_web_application_flow(user=self.user_name) |
|
|
|
|
|
|
|
auth_response = session.get(authorization_url) |
|
|
@@ -65,10 +124,39 @@ class TestConnectedApp(unittest.TestCase): |
|
|
|
callback_response = session.get(auth_response.url) |
|
|
|
self.assertEqual(callback_response.status_code, 200) |
|
|
|
|
|
|
|
token_cache = self.connected_app.get_token_cache(self.user_name) |
|
|
|
token = token_cache.get_password('access_token') |
|
|
|
self.token_cache = self.connected_app.get_token_cache(self.user_name) |
|
|
|
token = self.token_cache.get_password('access_token') |
|
|
|
self.assertNotEqual(token, None) |
|
|
|
|
|
|
|
oauth2_session = self.connected_app.get_oauth2_session(self.user_name) |
|
|
|
resp = oauth2_session.get(urljoin(self.base_url, '/api/method/frappe.auth.get_logged_user')) |
|
|
|
self.assertEqual(resp.json().get('message'), self.user_name) |
|
|
|
|
|
|
|
def tearDown(self): |
|
|
|
def delete_if_exists(attribute): |
|
|
|
doc = getattr(self, attribute, None) |
|
|
|
if doc: |
|
|
|
doc.delete() |
|
|
|
|
|
|
|
delete_if_exists('token_cache') |
|
|
|
delete_if_exists('connected_app') |
|
|
|
|
|
|
|
if getattr(self, 'oauth_client', None): |
|
|
|
tokens = frappe.get_all('OAuth Bearer Token', filters={ |
|
|
|
'client': self.oauth_client.name |
|
|
|
}) |
|
|
|
for token in tokens: |
|
|
|
doc = frappe.get_doc('OAuth Bearer Token', token.name) |
|
|
|
doc.delete() |
|
|
|
|
|
|
|
codes = frappe.get_all('OAuth Authorization Code', filters={ |
|
|
|
'client': self.oauth_client.name |
|
|
|
}) |
|
|
|
for code in codes: |
|
|
|
doc = frappe.get_doc('OAuth Authorization Code', code.name) |
|
|
|
doc.delete() |
|
|
|
|
|
|
|
delete_if_exists('user') |
|
|
|
delete_if_exists('oauth_client') |
|
|
|
|
|
|
|
frappe.db.commit() |