|
|
@@ -145,7 +145,7 @@ def start_worker(queue=None, quiet = False, rq_username=None, rq_password=None): |
|
|
|
# empty init is required to get redis_queue from common_site_config.json |
|
|
|
redis_connection = get_redis_conn(username=rq_username, password=rq_password) |
|
|
|
queues = get_queue_list(queue, build_queue_name=True) |
|
|
|
queue_name = queue and rename_queue(queue) |
|
|
|
queue_name = queue and generate_qname(queue) |
|
|
|
|
|
|
|
if os.environ.get('CI'): |
|
|
|
setup_loghandlers('ERROR') |
|
|
@@ -206,7 +206,7 @@ def get_queue_list(queue_list=None, build_queue_name=False): |
|
|
|
validate_queue(queue, default_queue_list) |
|
|
|
else: |
|
|
|
queue_list = default_queue_list |
|
|
|
return [rename_queue(q) for q in queue_list] if build_queue_name else queue_list |
|
|
|
return [generate_qname(qtype) for qtype in queue_list] if build_queue_name else queue_list |
|
|
|
|
|
|
|
def get_workers(queue): |
|
|
|
'''Returns a list of Worker objects tied to a queue object''' |
|
|
@@ -222,10 +222,10 @@ def get_running_jobs_in_queue(queue): |
|
|
|
jobs.append(current_job) |
|
|
|
return jobs |
|
|
|
|
|
|
|
def get_queue(queue, is_async=True): |
|
|
|
def get_queue(qtype, is_async=True): |
|
|
|
'''Returns a Queue object tied to a redis connection''' |
|
|
|
validate_queue(queue) |
|
|
|
return Queue(rename_queue(queue), connection=get_redis_conn(), is_async=is_async) |
|
|
|
validate_queue(qtype) |
|
|
|
return Queue(generate_qname(qtype), connection=get_redis_conn(), is_async=is_async) |
|
|
|
|
|
|
|
def validate_queue(queue, default_queue_list=None): |
|
|
|
if not default_queue_list: |
|
|
@@ -274,17 +274,17 @@ def get_queues() -> List[Queue]: |
|
|
|
queues = Queue.all(connection=get_redis_conn()) |
|
|
|
return [q for q in queues if is_queue_accessible(q)] |
|
|
|
|
|
|
|
def rename_queue(qname: str) -> str: |
|
|
|
"""Rename qname by adding bench name as prefix. |
|
|
|
def generate_qname(qtype: str) -> str: |
|
|
|
"""Generate qname by combining bench ID and queue type. |
|
|
|
|
|
|
|
Renamed queues are useful to define namespaces of customers. |
|
|
|
qnames are useful to define namespaces of customers. |
|
|
|
""" |
|
|
|
return f"{get_bench_id()}:{qname}" |
|
|
|
return f"{get_bench_id()}:{qtype}" |
|
|
|
|
|
|
|
def is_queue_accessible(qobj: Queue) -> bool: |
|
|
|
"""Checks whether queue is relate to current bench or not. |
|
|
|
""" |
|
|
|
accessible_queues = [rename_queue(q) for q in list(queue_timeout)] |
|
|
|
accessible_queues = [generate_qname(q) for q in list(queue_timeout)] |
|
|
|
return qobj.name in accessible_queues |
|
|
|
|
|
|
|
def enqueue_test_job(): |
|
|
|