Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 
 
 

112 строки
3.0 KiB

  1. # -*- coding: utf-8 -*-
  2. # Copyright (c) 2020, Frappe Technologies Pvt. Ltd. and Contributors
  3. # License: MIT. See LICENSE
  4. from datetime import datetime
  5. import json
  6. import traceback
  7. import frappe
  8. import os
  9. import uuid
  10. import rq
  11. MONITOR_REDIS_KEY = "monitor-transactions"
  12. MONITOR_MAX_ENTRIES = 1000000
  13. def start(transaction_type="request", method=None, kwargs=None):
  14. if frappe.conf.monitor:
  15. frappe.local.monitor = Monitor(transaction_type, method, kwargs)
  16. def stop(response=None):
  17. if hasattr(frappe.local, "monitor"):
  18. frappe.local.monitor.dump(response)
  19. def log_file():
  20. return os.path.join(frappe.utils.get_bench_path(), "logs", "monitor.json.log")
  21. class Monitor:
  22. def __init__(self, transaction_type, method, kwargs):
  23. try:
  24. self.data = frappe._dict(
  25. {
  26. "site": frappe.local.site,
  27. "timestamp": datetime.utcnow(),
  28. "transaction_type": transaction_type,
  29. "uuid": str(uuid.uuid4()),
  30. }
  31. )
  32. if transaction_type == "request":
  33. self.collect_request_meta()
  34. else:
  35. self.collect_job_meta(method, kwargs)
  36. except Exception:
  37. traceback.print_exc()
  38. def collect_request_meta(self):
  39. self.data.request = frappe._dict(
  40. {
  41. "ip": frappe.local.request_ip,
  42. "method": frappe.request.method,
  43. "path": frappe.request.path,
  44. }
  45. )
  46. def collect_job_meta(self, method, kwargs):
  47. self.data.job = frappe._dict({"method": method, "scheduled": False, "wait": 0})
  48. if "run_scheduled_job" in method:
  49. self.data.job.method = kwargs["job_type"]
  50. self.data.job.scheduled = True
  51. job = rq.get_current_job()
  52. if job:
  53. self.data.uuid = job.id
  54. waitdiff = self.data.timestamp - job.enqueued_at
  55. self.data.job.wait = int(waitdiff.total_seconds() * 1000000)
  56. def dump(self, response=None):
  57. try:
  58. timediff = datetime.utcnow() - self.data.timestamp
  59. # Obtain duration in microseconds
  60. self.data.duration = int(timediff.total_seconds() * 1000000)
  61. if self.data.transaction_type == "request":
  62. self.data.request.status_code = response.status_code
  63. self.data.request.response_length = int(response.headers.get("Content-Length", 0))
  64. if hasattr(frappe.local, "rate_limiter"):
  65. limiter = frappe.local.rate_limiter
  66. self.data.request.counter = limiter.counter
  67. if limiter.rejected:
  68. self.data.request.reset = limiter.reset
  69. self.store()
  70. except Exception:
  71. traceback.print_exc()
  72. def store(self):
  73. if frappe.cache().llen(MONITOR_REDIS_KEY) > MONITOR_MAX_ENTRIES:
  74. frappe.cache().ltrim(MONITOR_REDIS_KEY, 1, -1)
  75. serialized = json.dumps(self.data, sort_keys=True, default=str)
  76. frappe.cache().rpush(MONITOR_REDIS_KEY, serialized)
  77. def flush():
  78. try:
  79. # Fetch all the logs without removing from cache
  80. logs = frappe.cache().lrange(MONITOR_REDIS_KEY, 0, -1)
  81. if logs:
  82. logs = list(map(frappe.safe_decode, logs))
  83. with open(log_file(), "a", os.O_NONBLOCK) as f:
  84. f.write("\n".join(logs))
  85. f.write("\n")
  86. # Remove fetched entries from cache
  87. frappe.cache().ltrim(MONITOR_REDIS_KEY, len(logs) - 1, -1)
  88. except Exception:
  89. traceback.print_exc()