|
|
@@ -1,6 +1,7 @@ |
|
|
|
import os |
|
|
|
import socket |
|
|
|
import time |
|
|
|
from functools import lru_cache |
|
|
|
from uuid import uuid4 |
|
|
|
from collections import defaultdict |
|
|
|
from typing import List |
|
|
@@ -20,18 +21,22 @@ from frappe.utils.redis_queue import RedisQueue |
|
|
|
from frappe.utils.commands import log |
|
|
|
|
|
|
|
|
|
|
|
common_site_config = frappe.get_file_json("common_site_config.json") |
|
|
|
custom_workers_config = common_site_config.get("workers", {}) |
|
|
|
default_timeout = 300 |
|
|
|
queue_timeout = { |
|
|
|
"default": default_timeout, |
|
|
|
"short": default_timeout, |
|
|
|
"long": 1500, |
|
|
|
**{ |
|
|
|
worker: config.get("timeout", default_timeout) |
|
|
|
for worker, config in custom_workers_config.items() |
|
|
|
|
|
|
|
@lru_cache() |
|
|
|
def get_queues_timeout(): |
|
|
|
common_site_config = frappe.get_conf() |
|
|
|
custom_workers_config = common_site_config.get("workers", {}) |
|
|
|
default_timeout = 300 |
|
|
|
|
|
|
|
return { |
|
|
|
"default": default_timeout, |
|
|
|
"short": default_timeout, |
|
|
|
"long": 1500, |
|
|
|
**{ |
|
|
|
worker: config.get("timeout", default_timeout) |
|
|
|
for worker, config in custom_workers_config.items() |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
redis_connection = None |
|
|
|
|
|
|
@@ -57,7 +62,7 @@ def enqueue(method, queue='default', timeout=None, event=None, |
|
|
|
|
|
|
|
q = get_queue(queue, is_async=is_async) |
|
|
|
if not timeout: |
|
|
|
timeout = queue_timeout.get(queue) or 300 |
|
|
|
timeout = get_queues_timeout().get(queue) or 300 |
|
|
|
queue_args = { |
|
|
|
"site": frappe.local.site, |
|
|
|
"user": frappe.session.user, |
|
|
@@ -204,7 +209,7 @@ def get_jobs(site=None, queue=None, key='method'): |
|
|
|
|
|
|
|
def get_queue_list(queue_list=None, build_queue_name=False): |
|
|
|
'''Defines possible queues. Also wraps a given queue in a list after validating.''' |
|
|
|
default_queue_list = list(queue_timeout) |
|
|
|
default_queue_list = list(get_queues_timeout()) |
|
|
|
if queue_list: |
|
|
|
if isinstance(queue_list, str): |
|
|
|
queue_list = [queue_list] |
|
|
@@ -236,7 +241,7 @@ def get_queue(qtype, is_async=True): |
|
|
|
|
|
|
|
def validate_queue(queue, default_queue_list=None): |
|
|
|
if not default_queue_list: |
|
|
|
default_queue_list = list(queue_timeout) |
|
|
|
default_queue_list = list(get_queues_timeout()) |
|
|
|
|
|
|
|
if queue not in default_queue_list: |
|
|
|
frappe.throw(_("Queue should be one of {0}").format(', '.join(default_queue_list))) |
|
|
@@ -296,7 +301,7 @@ def generate_qname(qtype: str) -> str: |
|
|
|
def is_queue_accessible(qobj: Queue) -> bool: |
|
|
|
"""Checks whether queue is relate to current bench or not. |
|
|
|
""" |
|
|
|
accessible_queues = [generate_qname(q) for q in list(queue_timeout)] |
|
|
|
accessible_queues = [generate_qname(q) for q in list(get_queues_timeout())] |
|
|
|
return qobj.name in accessible_queues |
|
|
|
|
|
|
|
def enqueue_test_job(): |
|
|
|