# imports - standard imports import json import logging import os import re import subprocess import sys from functools import lru_cache from glob import glob from pathlib import Path from shlex import split from tarfile import TarInfo from typing import List, Optional, Tuple # imports - third party imports import click # imports - module imports from bench import PROJECT_NAME, VERSION from bench.exceptions import ( AppNotInstalledError, CommandFailedError, InvalidRemoteException, ) logger = logging.getLogger(PROJECT_NAME) paths_in_app = ("hooks.py", "modules.txt", "patches.txt") paths_in_bench = ("apps", "sites", "config", "logs", "config/pids") sudoers_file = "/etc/sudoers.d/xhiveframework" UNSET_ARG = object() def is_bench_directory(directory=os.path.curdir): is_bench = True for folder in paths_in_bench: path = os.path.abspath(os.path.join(directory, folder)) is_bench = is_bench and os.path.exists(path) return is_bench def is_xhiveframework_app(directory: str) -> bool: is_xhiveframework_app = True for folder in paths_in_app: if not is_xhiveframework_app: break path = glob(os.path.join(directory, "**", folder)) is_xhiveframework_app = is_xhiveframework_app and path return bool(is_xhiveframework_app) def get_bench_cache_path(sub_dir: Optional[str]) -> Path: relative_path = "~/.cache/bench" if sub_dir and not sub_dir.startswith("/"): relative_path += f"/{sub_dir}" cache_path = os.path.expanduser(relative_path) cache_path = Path(cache_path) cache_path.mkdir(parents=True, exist_ok=True) return cache_path @lru_cache(maxsize=None) def is_valid_xhiveframework_branch(xhiveframework_path: str, xhiveframework_branch: str): """Check if a branch exists in a repo. Throws InvalidRemoteException if branch is not found Uses native git command to check for branches on a remote. :param xhiveframework_path: git url :type xhiveframework_path: str :param xhiveframework_branch: branch to check :type xhiveframework_branch: str :raises InvalidRemoteException: branch for this repo doesn't exist """ from git.cmd import Git from git.exc import GitCommandError g = Git() if xhiveframework_branch: try: res = g.ls_remote("--heads", "--tags", xhiveframework_path, xhiveframework_branch) if not res: raise InvalidRemoteException( f"Invalid branch or tag: {xhiveframework_branch} for the remote {xhiveframework_path}" ) except GitCommandError as e: raise InvalidRemoteException(f"Invalid xhiveframework path: {xhiveframework_path}") from e def log(message, level=0, no_log=False, stderr=False): import bench import bench.cli levels = { 0: ("blue", "INFO"), # normal 1: ("green", "SUCCESS"), # success 2: ("red", "ERROR"), # fail 3: ("yellow", "WARN"), # warn/suggest } color, prefix = levels.get(level, levels[0]) if bench.cli.from_command_line and bench.cli.dynamic_feed: bench.LOG_BUFFER.append({"prefix": prefix, "message": message, "color": color}) if no_log: click.secho(message, fg=color, err=stderr) else: loggers = {2: logger.error, 3: logger.warning} level_logger = loggers.get(level, logger.info) level_logger(message) click.secho(f"{prefix}: {message}", fg=color, err=stderr) def check_latest_version(): if VERSION.endswith("dev"): return import requests from semantic_version import Version try: pypi_request = requests.get("https://pypi.org/pypi/xhiveframework-bench/json") except Exception: # Exceptions thrown are defined in requests.exceptions # ignore checking on all Exceptions return if pypi_request.status_code == 200: pypi_version_str = pypi_request.json().get("info").get("version") pypi_version = Version(pypi_version_str) local_version = Version(VERSION) if pypi_version > local_version: log( f"A newer version of bench is available: {local_version} → {pypi_version}", stderr=True, ) def pause_exec(seconds=10): from time import sleep for i in range(seconds, 0, -1): print(f"Will continue execution in {i} seconds...", end="\r") sleep(1) print(" " * 40, end="\r") def exec_cmd(cmd, cwd=".", env=None, _raise=True): if env: env.update(os.environ.copy()) click.secho(f"$ {cmd}", fg="bright_black") cwd_info = f"cd {cwd} && " if cwd != "." else "" cmd_log = f"{cwd_info}{cmd}" logger.debug(cmd_log) spl_cmd = split(cmd) return_code = subprocess.call(spl_cmd, cwd=cwd, universal_newlines=True, env=env) if return_code: logger.warning(f"{cmd_log} executed with exit code {return_code}") if _raise: raise CommandFailedError(cmd) from subprocess.CalledProcessError(return_code, cmd) return return_code def which(executable: str, raise_err: bool = False) -> str: from shutil import which exec_ = which(executable) if not exec_ and raise_err: raise FileNotFoundError(f"{executable} not found in PATH") return exec_ def setup_logging(bench_path=".") -> logging.Logger: LOG_LEVEL = 15 logging.addLevelName(LOG_LEVEL, "LOG") def logv(self, message, *args, **kws): if self.isEnabledFor(LOG_LEVEL): self._log(LOG_LEVEL, message, args, **kws) logging.Logger.log = logv if os.path.exists(os.path.join(bench_path, "logs")): log_file = os.path.join(bench_path, "logs", "bench.log") hdlr = logging.FileHandler(log_file) else: hdlr = logging.NullHandler() logger = logging.getLogger(PROJECT_NAME) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") hdlr.setFormatter(formatter) logger.addHandler(hdlr) logger.setLevel(logging.DEBUG) return logger def get_process_manager() -> str: for proc_man in ["honcho", "foreman", "forego"]: proc_man_path = which(proc_man) if proc_man_path: return proc_man_path def get_git_version() -> float: """returns git version from `git --version` extracts version number from string `get version 1.9.1` etc""" version = get_cmd_output("git --version") version = version.strip().split()[2] version = ".".join(version.split(".")[0:2]) return float(version) def get_cmd_output(cmd, cwd=".", _raise=True): output = "" try: output = subprocess.check_output( cmd, cwd=cwd, shell=True, stderr=subprocess.PIPE, encoding="utf-8" ).strip() except subprocess.CalledProcessError as e: if e.output: output = e.output elif _raise: raise return output def is_root(): return os.getuid() == 0 def run_xhiveframework_cmd(*args, **kwargs): from bench.cli import from_command_line from bench.utils.bench import get_env_cmd bench_path = kwargs.get("bench_path", ".") f = get_env_cmd("python", bench_path=bench_path) sites_dir = os.path.join(bench_path, "sites") is_async = not from_command_line if is_async: stderr = stdout = subprocess.PIPE else: stderr = stdout = None p = subprocess.Popen( (f, "-m", "xhiveframework.utils.bench_helper", "xhiveframework") + args, cwd=sites_dir, stdout=stdout, stderr=stderr, ) return_code = print_output(p) if is_async else p.wait() if return_code > 0: sys.exit(return_code) def print_output(p): from select import select while p.poll() is None: readx = select([p.stdout.fileno(), p.stderr.fileno()], [], [])[0] send_buffer = [] for fd in readx: if fd == p.stdout.fileno(): while 1: buf = p.stdout.read(1) if not len(buf): break if buf == "\r" or buf == "\n": send_buffer.append(buf) log_line("".join(send_buffer), "stdout") send_buffer = [] else: send_buffer.append(buf) if fd == p.stderr.fileno(): log_line(p.stderr.readline(), "stderr") return p.poll() def log_line(data, stream): if stream == "stderr": return sys.stderr.write(data) return sys.stdout.write(data) def get_bench_name(bench_path): return os.path.basename(os.path.abspath(bench_path)) def set_git_remote_url(git_url, bench_path="."): "Set app remote git url" from bench.app import get_repo_dir from bench.bench import Bench app = git_url.rsplit("/", 1)[1].rsplit(".", 1)[0] if app not in Bench(bench_path).apps: raise AppNotInstalledError(f"No app named {app}") app_dir = get_repo_dir(app, bench_path=bench_path) if os.path.exists(os.path.join(app_dir, ".git")): exec_cmd(f"git remote set-url upstream {git_url}", cwd=app_dir) def run_playbook(playbook_name, extra_vars=None, tag=None): import bench if not which("ansible"): print( "Ansible is needed to run this command, please install it using 'pip" " install ansible'" ) sys.exit(1) args = ["ansible-playbook", "-c", "local", playbook_name, "-vvvv"] if extra_vars: args.extend(["-e", json.dumps(extra_vars)]) if tag: args.extend(["-t", tag]) subprocess.check_call(args, cwd=os.path.join(bench.__path__[0], "playbooks")) def find_benches(directory: str = None) -> List: if not directory: directory = os.path.expanduser("~") elif os.path.exists(directory): directory = os.path.abspath(directory) else: log("Directory doesn't exist", level=2) sys.exit(1) if is_bench_directory(directory): if os.path.curdir == directory: print("You are in a bench directory!") else: print(f"{directory} is a bench directory!") return benches = [] try: sub_directories = os.listdir(directory) except PermissionError: return benches for sub in sub_directories: sub = os.path.join(directory, sub) if os.path.isdir(sub) and not os.path.islink(sub): if is_bench_directory(sub): print(f"{sub} found!") benches.append(sub) else: benches.extend(find_benches(sub)) return benches def is_dist_editable(dist: str) -> bool: """Is distribution an editable install?""" for path_item in sys.path: egg_link = os.path.join(path_item, f"{dist}.egg-link") if os.path.isfile(egg_link): return True return False def find_parent_bench(path: str) -> str: """Checks if parent directories are benches""" if is_bench_directory(directory=path): return path home_path = os.path.expanduser("~") root_path = os.path.abspath(os.sep) if path not in {home_path, root_path}: # NOTE: the os.path.split assumes that given path is absolute parent_dir = os.path.split(path)[0] return find_parent_bench(parent_dir) def get_env_xhiveframework_commands(bench_path=".") -> List: """Caches all available commands (even custom apps) via Xhiveframework Default caching behaviour: generated the first time any command (for a specific bench directory) """ from bench.utils.bench import get_env_cmd python = get_env_cmd("python", bench_path=bench_path) sites_path = os.path.join(bench_path, "sites") try: return json.loads( get_cmd_output( f"{python} -m xhiveframework.utils.bench_helper get-xhiveframework-commands", cwd=sites_path ) ) except subprocess.CalledProcessError as e: if hasattr(e, "stderr"): print(e.stderr) return [] def find_org(org_repo, using_cached: bool=False): import requests org_repo = org_repo[0] for org in ["xhiveframework", "xhiveerp"]: res = requests.head(f"https://api.github.com/repos/{org}/{org_repo}") if res.status_code in (400, 403): res = requests.head(f"https://lab.membtech.com/{org}/{org_repo}") if res.ok: return org, org_repo if using_cached: return "", org_repo raise InvalidRemoteException(f"{org_repo} not found under xhiveframework or xhiveerp GitHub accounts") def fetch_details_from_tag(_tag: str, using_cached: bool=False) -> Tuple[str, str, str]: if not _tag: raise Exception("Tag is not provided") app_tag = _tag.split("@") org_repo = app_tag[0].split("/") try: repo, tag = app_tag except ValueError: repo, tag = app_tag + [None] try: org, repo = org_repo except Exception: org, repo = find_org(org_repo, using_cached) return org, repo, tag def is_git_url(url: str) -> bool: # modified to allow without the tailing .git from https://github.com/jonschlinkert/is-git-url.git pattern = r"(?:git|ssh|https?|\w*@[-\w.]+):(\/\/)?(.*?)(\.git)?(\/?|\#[-\d\w._]+?)$" return bool(re.match(pattern, url)) def drop_privileges(uid_name="nobody", gid_name="nogroup"): import grp import pwd # from http://stackoverflow.com/a/2699996 if os.getuid() != 0: # We're not root so, like, whatever dude return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask os.umask(0o22) def get_available_folder_name(name: str, path: str) -> str: """Subfixes the passed name with -1 uptil -100 whatever's available""" if os.path.exists(os.path.join(path, name)): for num in range(1, 100): _dt = f"{name}_{num}" if not os.path.exists(os.path.join(path, _dt)): return _dt return name def get_traceback() -> str: """Returns the traceback of the Exception""" from traceback import format_exception exc_type, exc_value, exc_tb = sys.exc_info() if not any([exc_type, exc_value, exc_tb]): return "" trace_list = format_exception(exc_type, exc_value, exc_tb) return "".join(trace_list) class _dict(dict): """dict like object that exposes keys as attributes""" # bench port of xhiveframework._dict def __getattr__(self, key): ret = self.get(key) # "__deepcopy__" exception added to fix xhiveframework#14833 via DFP if not ret and key.startswith("__") and key != "__deepcopy__": raise AttributeError() return ret def __setattr__(self, key, value): self[key] = value def __getstate__(self): return self def __setstate__(self, d): self.update(d) def update(self, d): """update and return self -- the missing dict feature in python""" super().update(d) return self def copy(self): return _dict(dict(self).copy()) def get_cmd_from_sysargv(): """Identify and segregate tokens to options and command For Command: `bench --profile --site xhiveframework.com migrate --no-backup` sys.argv: ["/home/xhiveframework/.local/bin/bench", "--profile", "--site", "xhiveframework.com", "migrate", "--no-backup"] Actual command run: migrate """ # context is passed as options to xhiveframework's bench_helper from bench.bench import Bench xhiveframework_context = _dict(params={"--site"}, flags={"--verbose", "--profile", "--force"}) cmd_from_ctx = None sys_argv = sys.argv[1:] skip_next = False for arg in sys_argv: if skip_next: skip_next = False continue if arg in xhiveframework_context.flags: continue elif arg in xhiveframework_context.params: skip_next = True continue if sys_argv.index(arg) == 0 and arg in Bench(".").apps: continue cmd_from_ctx = arg break return cmd_from_ctx def get_app_cache_extract_filter( count_threshold: int = 10_000, size_threshold: int = 1_000_000_000, ): # -> Callable[[TarInfo, str], TarInfo | None] state = dict(count=0, size=0) AbsoluteLinkError = Exception def data_filter(m: TarInfo, _:str) -> TarInfo: return m if (sys.version_info.major == 3 and sys.version_info.minor > 7) or sys.version_info.major > 3: from tarfile import data_filter, AbsoluteLinkError def filter_function(member: TarInfo, dest_path: str) -> Optional[TarInfo]: state["count"] += 1 state["size"] += member.size if state["count"] > count_threshold: raise RuntimeError(f"Number of entries exceeds threshold ({state['count']})") if state["size"] > size_threshold: raise RuntimeError(f"Extracted size exceeds threshold ({state['size']})") try: return data_filter(member, dest_path) except AbsoluteLinkError: # Links created by `xhiveframework` after extraction return None return filter_function