ソースを参照

refactor: Add authentication for Redis Queue

version-14
leela 4年前
コミット
db09a85183
9個のファイルの変更173行の追加51行の削除
  1. +15
    -10
      frappe/commands/redis.py
  2. +6
    -2
      frappe/commands/scheduler.py
  3. +4
    -4
      frappe/core/page/background_jobs/background_jobs.py
  4. +1
    -0
      frappe/test_runner.py
  5. +3
    -3
      frappe/tests/test_background_jobs.py
  6. +70
    -0
      frappe/tests/test_redis.py
  7. +1
    -1
      frappe/utils/__init__.py
  8. +63
    -17
      frappe/utils/background_jobs.py
  9. +10
    -14
      frappe/utils/rq.py

+ 15
- 10
frappe/commands/redis.py ファイルの表示

@@ -1,7 +1,6 @@
import os

import click
import redis

import frappe
from frappe.utils.rq import RedisQueue
@@ -9,16 +8,18 @@ from frappe.installer import update_site_config

@click.command('create-rq-users')
@click.option('--set-admin-password', is_flag=True, default=False, help='Set new Redis admin(default user) password')
@click.option('--reset-passwords', is_flag=True, default=False, help='Remove all existing passwords')
def create_rq_users(set_admin_password=False, reset_passwords=False):
@click.option('--use-rq-auth', is_flag=True, default=False, help='Enable Redis authentication for sites')
def create_rq_users(set_admin_password=False, use_rq_auth=False):
"""Create Redis Queue users and add to acl and app configs.

acl config file will be used by redis server while starting the server
and app config is used by app while connecting to redis server.
"""
acl_file_path = os.path.abspath('../config/redis_queue.acl')
acl_list, user_credentials = RedisQueue.gen_acl_list(
reset_passwords=reset_passwords, set_admin_password=set_admin_password)

with frappe.init_site():
acl_list, user_credentials = RedisQueue.gen_acl_list(
set_admin_password=set_admin_password)

with open(acl_file_path, 'w') as f:
f.writelines([acl+'\n' for acl in acl_list])
@@ -29,18 +30,22 @@ def create_rq_users(set_admin_password=False, reset_passwords=False):
site_config_path=common_site_config_path)
update_site_config("rq_password", user_credentials['bench'][1], validate=False,
site_config_path=common_site_config_path)
update_site_config("use_rq_auth", use_rq_auth, validate=False,
site_config_path=common_site_config_path)

click.secho('* ACL and site configs are updated with new user credentials. '
'Please restart Redis Queue server to enable namespaces.',
fg='green')

if set_admin_password:
env_key = 'RQ_ADMIN_PASWORD'
click.secho('Redis admin password is successfully set up. '
click.secho('* Redis admin password is successfully set up. '
'Include below line in .bashrc file for system to use',
fg='green'
)
fg='green')
click.secho(f"`export {env_key}={user_credentials['default'][1]}`")
click.secho('NOTE: Please save the admin password as you '
'can not access redis server without the password',
fg='yellow'
)
fg='yellow')


commands = [


+ 6
- 2
frappe/commands/scheduler.py ファイルの表示

@@ -172,9 +172,13 @@ def start_scheduler():
@click.command('worker')
@click.option('--queue', type=str)
@click.option('--quiet', is_flag = True, default = False, help = 'Hide Log Outputs')
def start_worker(queue, quiet = False):
@click.option('-u', '--rq-username', default=None, help='Redis ACL user')
@click.option('-p', '--rq-password', default=None, help='Redis ACL user password')
def start_worker(queue, quiet = False, rq_username=None, rq_password=None):
"""Site is used to find redis credentals.
"""
from frappe.utils.background_jobs import start_worker
start_worker(queue, quiet = quiet)
start_worker(queue, quiet = quiet, rq_username=rq_username, rq_password=rq_password)

@click.command('ready-for-migration')
@click.option('--site', help='site name')


+ 4
- 4
frappe/core/page/background_jobs/background_jobs.py ファイルの表示

@@ -4,12 +4,12 @@
import json
from typing import TYPE_CHECKING, Dict, List

from rq import Queue, Worker
from rq import Worker

import frappe
from frappe import _
from frappe.utils import convert_utc_to_user_timezone, format_datetime
from frappe.utils.background_jobs import get_redis_conn
from frappe.utils.background_jobs import get_redis_conn, get_queues
from frappe.utils.scheduler import is_scheduler_inactive

if TYPE_CHECKING:
@@ -29,7 +29,7 @@ def get_info(show_failed=False) -> List[Dict]:
show_failed = json.loads(show_failed)

conn = get_redis_conn()
queues = Queue.all(conn)
queues = get_queues()
workers = Worker.all(conn)
jobs = []

@@ -75,7 +75,7 @@ def get_info(show_failed=False) -> List[Dict]:
@frappe.whitelist()
def remove_failed_jobs():
conn = get_redis_conn()
queues = Queue.all(conn)
queues = get_queues()
for queue in queues:
fail_registry = queue.failed_job_registry
for job_id in fail_registry.get_job_ids():


+ 1
- 0
frappe/test_runner.py ファイルの表示

@@ -56,6 +56,7 @@ def main(app=None, module=None, doctype=None, verbose=False, tests=(),
frappe.clear_cache()
frappe.utils.scheduler.disable_scheduler()
set_test_email_config()
frappe.conf.update({'bench_id': 'test_bench', 'use_rq_auth': False})

if not frappe.flags.skip_before_tests:
if verbose:


+ 3
- 3
frappe/tests/test_background_jobs.py ファイルの表示

@@ -4,7 +4,7 @@ from rq import Queue

import frappe
from frappe.core.page.background_jobs.background_jobs import remove_failed_jobs
from frappe.utils.background_jobs import get_redis_conn
from frappe.utils.background_jobs import get_redis_conn, rename_queue
import time


@@ -17,14 +17,14 @@ class TestBackgroundJobs(unittest.TestCase):
queues = Queue.all(conn)

for queue in queues:
if queue.name == "short":
if queue.name == rename_queue("short"):
fail_registry = queue.failed_job_registry
self.assertGreater(fail_registry.count, 0)

remove_failed_jobs()

for queue in queues:
if queue.name == "short":
if queue.name == rename_queue("short"):
fail_registry = queue.failed_job_registry
self.assertEqual(fail_registry.count, 0)



+ 70
- 0
frappe/tests/test_redis.py ファイルの表示

@@ -0,0 +1,70 @@
import unittest
import functools

import redis

import frappe
from frappe.utils import get_bench_id
from frappe.utils.rq import RedisQueue
from frappe.utils.background_jobs import get_redis_conn

def version_tuple(version):
return tuple(map(int, (version.split("."))))

def skip_if_redis_version_lt(version):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
conn = get_redis_conn()
redis_version = conn.execute_command('info')['redis_version']
if version_tuple(redis_version) < version_tuple(version):
return
return func(*args, **kwargs)
return wrapper
return decorator

class TestRedisAuth(unittest.TestCase):
@skip_if_redis_version_lt('6.0')
def test_rq_gen_acllist(self):
"""Make sure that ACL list is genrated
"""
acl_list = RedisQueue.gen_acl_list()
self.assertEqual(acl_list[1]['bench'][0], get_bench_id())

@skip_if_redis_version_lt('6.0')
def test_adding_redis_user(self):
acl_list = RedisQueue.gen_acl_list()
username, password = acl_list[1]['bench']
conn = get_redis_conn()

conn.acl_deluser(username)
_ = RedisQueue(conn).add_user(username, password)
self.assertTrue(conn.acl_getuser(username))
conn.acl_deluser(username)

@skip_if_redis_version_lt('6.0')
def test_rq_namespace(self):
"""Make sure that user can access only their respective namespace.
"""
# Current bench ID
bench_id = frappe.conf.get('bench_id')
conn = get_redis_conn()
conn.set('rq:queue:test_bench1:abc', 'value')
conn.set(f'rq:queue:{bench_id}:abc', 'value')

# Create new Redis Queue user
tmp_bench_id = 'test_bench1'
username, password = tmp_bench_id, 'password1'
conn.acl_deluser(username)
frappe.conf.update({'bench_id': tmp_bench_id})
_ = RedisQueue(conn).add_user(username, password)
test_bench1_conn = RedisQueue.get_connection(username, password)

self.assertEqual(test_bench1_conn.get('rq:queue:test_bench1:abc'), b'value')

# User should not be able to access queues apart from their bench queues
with self.assertRaises(redis.exceptions.NoPermissionError):
test_bench1_conn.get(f'rq:queue:{bench_id}:abc')

frappe.conf.update({'bench_id': bench_id})
conn.acl_deluser(username)

+ 1
- 1
frappe/utils/__init__.py ファイルの表示

@@ -384,7 +384,7 @@ def get_bench_path():
return os.path.realpath(os.path.join(os.path.dirname(frappe.__file__), '..', '..', '..'))

def get_bench_id():
return frappe.local.conf.get('bench_id', 'DefaultBench')
return frappe.get_conf().get('bench_id', 'DefaultBench')

def get_site_id(site=None):
return f"{site or frappe.local.site}@{get_bench_id()}"


+ 63
- 17
frappe/utils/background_jobs.py ファイルの表示

@@ -1,13 +1,21 @@
import os
import socket
import time
from uuid import uuid4
from collections import defaultdict


import redis
from typing import List
from rq import Connection, Queue, Worker
from rq.logutils import setup_loghandlers
from frappe.utils import cstr
from collections import defaultdict

import frappe
import os, socket, time
from frappe import _
from uuid import uuid4
import frappe.monitor
from frappe.utils import cstr, get_bench_id
from frappe.utils.rq import RedisQueue
from frappe.utils.commands import log


default_timeout = 300
@@ -131,21 +139,22 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True,
if is_async:
frappe.destroy()

def start_worker(queue=None, quiet = False):
def start_worker(queue=None, quiet = False, rq_username=None, rq_password=None):
'''Wrapper to start rq worker. Connects to redis and monitors these queues.'''
with frappe.init_site():
# empty init is required to get redis_queue from common_site_config.json
redis_connection = get_redis_conn()
redis_connection = get_redis_conn(username=rq_username, password=rq_password)
queues = get_queue_list(queue, build_queue_name=True)
queue_name = queue and rename_queue(queue)

if os.environ.get('CI'):
setup_loghandlers('ERROR')

with Connection(redis_connection):
queues = get_queue_list(queue)
logging_level = "INFO"
if quiet:
logging_level = "WARNING"
Worker(queues, name=get_worker_name(queue)).work(logging_level = logging_level)
Worker(queues, name=get_worker_name(queue_name)).work(logging_level = logging_level)

def get_worker_name(queue):
'''When limiting worker to a specific queue, also append queue name to default worker name'''
@@ -186,7 +195,7 @@ def get_jobs(site=None, queue=None, key='method'):

return jobs_per_site

def get_queue_list(queue_list=None):
def get_queue_list(queue_list=None, build_queue_name=False):
'''Defines possible queues. Also wraps a given queue in a list after validating.'''
default_queue_list = list(queue_timeout)
if queue_list:
@@ -195,11 +204,9 @@ def get_queue_list(queue_list=None):

for queue in queue_list:
validate_queue(queue, default_queue_list)

return queue_list

else:
return default_queue_list
queue_list = default_queue_list
return [rename_queue(q) for q in queue_list] if build_queue_name else queue_list

def get_workers(queue):
'''Returns a list of Worker objects tied to a queue object'''
@@ -218,7 +225,7 @@ def get_running_jobs_in_queue(queue):
def get_queue(queue, is_async=True):
'''Returns a Queue object tied to a redis connection'''
validate_queue(queue)
return Queue(queue, connection=get_redis_conn(), is_async=is_async)
return Queue(rename_queue(queue), connection=get_redis_conn(), is_async=is_async)

def validate_queue(queue, default_queue_list=None):
if not default_queue_list:
@@ -227,7 +234,7 @@ def validate_queue(queue, default_queue_list=None):
if queue not in default_queue_list:
frappe.throw(_("Queue should be one of {0}").format(', '.join(default_queue_list)))

def get_redis_conn():
def get_redis_conn(username=None, password=None):
if not hasattr(frappe.local, 'conf'):
raise Exception('You need to call frappe.init')

@@ -236,11 +243,50 @@ def get_redis_conn():

global redis_connection

if not redis_connection:
redis_connection = redis.from_url(frappe.local.conf.redis_queue)
cred = frappe._dict()
if frappe.conf.get('use_rq_auth'):
if username:
cred['username'] = username
cred['password'] = password
else:
cred['username'] = frappe.get_site_config().rq_username or get_bench_id()
cred['password'] = frappe.get_site_config().rq_password

elif os.environ.get('RQ_ADMIN_PASWORD'):
cred['username'] = 'default'
cred['password'] = os.environ.get('RQ_ADMIN_PASWORD')
try:
redis_connection = RedisQueue.get_connection(**cred)
except (redis.exceptions.AuthenticationError, redis.exceptions.ResponseError):
log(f'Wrong credentials used for {cred.username or "default user"}. '
'You can reset credentials using `bench create-rq-users` CLI and restart the server',
colour='red')
raise
except Exception:
log(f'Please make sure that Redis Queue runs @ {frappe.get_conf().redis_queue}', colour='red')
raise

return redis_connection

def get_queues() -> List[Queue]:
"""Get all the queues linked to the current bench.
"""
queues = Queue.all(connection=get_redis_conn())
return [q for q in queues if is_queue_accessible(q)]

def rename_queue(qname: str) -> str:
"""Rename qname by adding bench name as prefix.

Renamed queues are useful to define namespaces of customers.
"""
return f"{get_bench_id()}:{qname}"

def is_queue_accessible(qobj: Queue) -> bool:
"""Checks whether queue is relate to current bench or not.
"""
accessible_queues = [rename_queue(q) for q in list(queue_timeout)]
return qobj.name in accessible_queues

def enqueue_test_job():
enqueue('frappe.utils.background_jobs.test_job', s=100)



+ 10
- 14
frappe/utils/rq.py ファイルの表示

@@ -1,8 +1,7 @@
import redis

import frappe
from frappe.utils import get_site_id, get_bench_id, random_string

from frappe.utils import get_bench_id, random_string

class RedisQueue:
def __init__(self, conn):
@@ -17,9 +16,10 @@ class RedisQueue:
return frappe._dict(user_settings) if is_created else {}

@classmethod
def get_connection(cls, username='default', password=None):
domain = frappe.local.conf.redis_queue.split("redis://", 1)[-1]
url = f"redis://{username}:{password or ''}@{domain}"
def get_connection(cls, username=None, password=None):
rq_url = frappe.local.conf.redis_queue
domain = rq_url.split("redis://", 1)[-1]
url = (username and f"redis://{username}:{password or ''}@{domain}") or rq_url
conn = redis.from_url(url)
conn.ping()
return conn
@@ -63,25 +63,21 @@ class RedisQueue:
return ['+@all', '-@admin']

@classmethod
def gen_acl_list(cls, reset_passwords=False, set_admin_password=False):
def gen_acl_list(cls, set_admin_password=False):
"""Generate list of ACL users needed for this branch.

This list contains default ACL user and the bench ACL user(used by all sites incase of ACL is enabled).
"""
with frappe.init_site():
bench_username = get_bench_id()
bench_user_rules = cls.get_acl_key_rules(include_key_prefix=True) + cls.get_acl_command_rules()

bench_username = get_bench_id()
bench_user_rules = cls.get_acl_key_rules(include_key_prefix=True) + cls.get_acl_command_rules()
bench_user_rule_str = ' '.join(bench_user_rules).strip()
bench_user_password = random_string(20)
bench_user_resetpass = (reset_passwords and 'resetpass') or ''

default_username = 'default'
_default_user_password = random_string(20) if set_admin_password else ''
default_user_password = '>'+_default_user_password if _default_user_password else 'nopass'
default_user_resetpass = (reset_passwords and set_admin_password and 'resetpass') or ''

return [
f'user {default_username} on {default_user_password} {default_user_resetpass} ~* &* +@all',
f'user {bench_username} on >{bench_user_password} {bench_user_resetpass} {bench_user_rule_str}'
f'user {default_username} on {default_user_password} ~* &* +@all',
f'user {bench_username} on >{bench_user_password} {bench_user_rule_str}'
], {'bench': (bench_username, bench_user_password), 'default': (default_username, _default_user_password)}

読み込み中…
キャンセル
保存