From 4740e4f7e6579b8d838336c41bb97ce713e3b55b Mon Sep 17 00:00:00 2001 From: Aditya Hase Date: Thu, 5 Mar 2020 15:05:40 +0530 Subject: [PATCH] refactor: Monitor Do not collect request headers Collect job wait time and whether it was scheduled --- frappe/app.py | 2 +- frappe/monitor.py | 119 ++++++++++++++++---------------- frappe/tests/test_monitor.py | 58 ++++++++++------ frappe/utils/background_jobs.py | 2 +- 4 files changed, 100 insertions(+), 81 deletions(-) diff --git a/frappe/app.py b/frappe/app.py index e3ac2f495d..24ce35b514 100644 --- a/frappe/app.py +++ b/frappe/app.py @@ -93,7 +93,7 @@ def application(request): if response and hasattr(frappe.local, 'cookie_manager'): frappe.local.cookie_manager.flush_cookies(response=response) - frappe.monitor.stop() + frappe.monitor.stop(response) frappe.recorder.dump() frappe.destroy() diff --git a/frappe/monitor.py b/frappe/monitor.py index 794bac1390..7181bd92ad 100644 --- a/frappe/monitor.py +++ b/frappe/monitor.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors # MIT License. See license.txt from __future__ import unicode_literals @@ -10,21 +10,21 @@ import traceback import frappe import os import uuid +import rq MONITOR_REDIS_KEY = "monitor-transactions" +MONITOR_MAX_ENTRIES = 1000000 def start(transaction_type="request", method=None, kwargs=None): if frappe.conf.monitor: - frappe.local.monitor = Monitor( - transaction_type=transaction_type, method=method, kwargs=kwargs - ) + frappe.local.monitor = Monitor(transaction_type, method, kwargs) -def stop(): +def stop(response=None): if frappe.conf.monitor and hasattr(frappe.local, "monitor"): - frappe.local.monitor.dump() + frappe.local.monitor.dump(response) def log_file(): @@ -32,75 +32,76 @@ def log_file(): class Monitor: - def __init__(self, transaction_type=None, method=None, kwargs=None): + def __init__(self, transaction_type, method, kwargs): try: - self.site = frappe.local.site - self.timestamp = datetime.utcnow() - self.transaction_type = transaction_type - self.uuid = uuid.uuid4() - - if self.transaction_type == "request": - self.data = frappe.form_dict - self.headers = dict(frappe.request.headers) - self.ip = frappe.local.request_ip - self.method = frappe.request.method - self.path = frappe.request.path + self.data = frappe._dict( + { + "site": frappe.local.site, + "timestamp": datetime.utcnow(), + "transaction_type": transaction_type, + "uuid": str(uuid.uuid4()), + } + ) + + if transaction_type == "request": + self.collect_request_meta() else: - self.kwargs = kwargs - self.method = method + self.collect_job_meta(method, kwargs) except Exception: traceback.print_exc() - def dump(self): + def collect_request_meta(self): + self.data.request = frappe._dict( + { + "ip": frappe.local.request_ip, + "method": frappe.request.method, + "path": frappe.request.path, + } + ) + + def collect_job_meta(self, method, kwargs): + self.data.job = frappe._dict({"method": method, "scheduled": False, "wait": 0}) + if "run_scheduled_job" in method: + self.data.job.method = kwargs["job_type"] + self.data.job.scheduled = True + + job = rq.get_current_job() + if job: + self.data.uuid = job.id + waitdiff = self.data.timestamp - job.enqueued_at + self.data.job.wait = int(waitdiff.total_seconds() * 1000000) + + def dump(self, response=None): try: - timediff = datetime.utcnow() - self.timestamp + timediff = datetime.utcnow() - self.data.timestamp # Obtain duration in microseconds - self.duration = int(timediff.total_seconds() * 1000000) - data = { - "uuid": self.uuid, - "duration": self.duration, - "site": self.site, - "timestamp": self.timestamp.isoformat(sep=" "), - "transaction_type": self.transaction_type, - } + self.data.duration = int(timediff.total_seconds() * 1000000) - if self.transaction_type == "request": - update = { - "data": self.data, - "headers": self.headers, - "ip": self.ip, - "method": self.method, - "path": self.path, - } - else: - update = { - "kwargs": self.kwargs, - "method": self.method, - } - data.update(update) - json_data = json.dumps(data, sort_keys=True, default=str) - store(json_data) + if self.data.transaction_type == "request": + self.data.request.status_code = response.status_code + self.data.request.response_length = int(response.headers["Content-Length"]) + + self.store() except Exception: traceback.print_exc() - -def store(json_data): - MAX_LOGS = 1000000 - if frappe.cache().llen(MONITOR_REDIS_KEY) > MAX_LOGS: - frappe.cache().ltrim(MONITOR_REDIS_KEY, 1, -1) - frappe.cache().rpush(MONITOR_REDIS_KEY, json_data) + def store(self): + if frappe.cache().llen(MONITOR_REDIS_KEY) > MONITOR_MAX_ENTRIES: + frappe.cache().ltrim(MONITOR_REDIS_KEY, 1, -1) + serialized = json.dumps(self.data, sort_keys=True, default=str) + frappe.cache().rpush(MONITOR_REDIS_KEY, serialized) def flush(): try: # Fetch all the logs without removing from cache logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1) - logs = list(map(frappe.safe_decode, logs)) - with open(log_file(), "a", os.O_NONBLOCK) as f: - f.write("\n".join(logs)) - f.write("\n") - - # Remove fetched entries from cache - frappe.cache().ltrim(MONITOR_REDIS_KEY, len(logs) - 1, -1) + if logs: + logs = list(map(frappe.safe_decode, logs)) + with open(log_file(), "a", os.O_NONBLOCK) as f: + f.write("\n".join(logs)) + f.write("\n") + # Remove fetched entries from cache + frappe.cache().ltrim(MONITOR_REDIS_KEY, len(logs) - 1, -1) except Exception: traceback.print_exc() diff --git a/frappe/tests/test_monitor.py b/frappe/tests/test_monitor.py index 9f655660d5..d99b324d84 100644 --- a/frappe/tests/test_monitor.py +++ b/frappe/tests/test_monitor.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright (c) 2019, Frappe Technologies Pvt. Ltd. and Contributors +# Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors # MIT License. See license.txt from __future__ import unicode_literals @@ -7,6 +7,7 @@ import unittest import frappe import frappe.monitor from frappe.utils import set_request +from frappe.utils.response import build_response from frappe.monitor import MONITOR_REDIS_KEY import json @@ -17,19 +18,47 @@ class TestMonitor(unittest.TestCase): frappe.cache().delete_value(MONITOR_REDIS_KEY) def test_enable_monitor(self): - set_request() + set_request(method="GET", path="/api/method/frappe.ping") + response = build_response("json") + frappe.monitor.start() - frappe.monitor.stop() + frappe.monitor.stop(response) logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1) self.assertEqual(len(logs), 1) - log = json.loads(logs[0].decode()) - self.assertEqual(log["transaction_type"], "request") + + log = frappe.parse_json(logs[0].decode()) + self.assertTrue(log.duration) + self.assertTrue(log.site) + self.assertTrue(log.timestamp) + self.assertTrue(log.uuid) + self.assertTrue(log.request) + self.assertEqual(log.transaction_type, "request") + self.assertEqual(log.request["method"], "GET") + + # Reponse body will be set as "{}" + self.assertEqual(log.request["response_length"], 2) + self.assertEqual(log.request["status_code"], 200) + + def test_job(self): + frappe.utils.background_jobs.execute_job( + frappe.local.site, "frappe.ping", None, None, {}, is_async=False + ) + + logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1) + self.assertEqual(len(logs), 1) + log = frappe.parse_json(logs[0].decode()) + self.assertEqual(log.transaction_type, "job") + self.assertTrue(log.job) + self.assertEqual(log.job["method"], "frappe.ping") + self.assertEqual(log.job["scheduled"], False) + self.assertEqual(log.job["wait"], 0) def test_flush(self): - set_request() + set_request(method="GET", path="/api/method/frappe.ping") + response = build_response("json") frappe.monitor.start() - frappe.monitor.stop() + frappe.monitor.stop(response) open(frappe.monitor.log_file(), "w").close() frappe.monitor.flush() @@ -38,19 +67,8 @@ class TestMonitor(unittest.TestCase): logs = f.readlines() self.assertEqual(len(logs), 1) - log = json.loads(logs[0]) - self.assertEqual(log["transaction_type"], "request") - - def test_job(self): - frappe.utils.background_jobs.execute_job( - frappe.local.site, "frappe.ping", None, None, {}, is_async=False - ) - - logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1) - self.assertEqual(len(logs), 1) - log = json.loads(logs[0].decode()) - self.assertEqual(log["transaction_type"], "job") - self.assertEqual(log["method"], "frappe.ping") + log = frappe.parse_json(logs[0]) + self.assertEqual(log.transaction_type, "request") def tearDown(self): frappe.conf.monitor = 0 diff --git a/frappe/utils/background_jobs.py b/frappe/utils/background_jobs.py index 02f2319d55..03f063e058 100755 --- a/frappe/utils/background_jobs.py +++ b/frappe/utils/background_jobs.py @@ -95,7 +95,7 @@ def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, else: method_name = cstr(method.__name__) - frappe.monitor.start(transaction_type="job", method=method_name, kwargs=kwargs) + frappe.monitor.start("job", method_name, kwargs) try: method(**kwargs)