You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

290 lines
8.3 KiB

  1. # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and Contributors
  2. # MIT License. See license.txt
  3. """
  4. Events:
  5. always
  6. daily
  7. monthly
  8. weekly
  9. """
  10. from __future__ import unicode_literals
  11. import frappe
  12. import json
  13. import schedule
  14. import time
  15. import os
  16. import frappe.utils
  17. from frappe.utils import get_sites, get_site_path, touch_file
  18. from datetime import datetime
  19. from background_jobs import enqueue, get_jobs, queue_timeout
  20. from frappe.limits import has_expired
  21. from frappe.utils.data import get_datetime, now_datetime
  22. from frappe.core.doctype.user.user import STANDARD_USERS
  23. from frappe.installer import update_site_config
  24. DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
  25. def start_scheduler():
  26. '''Run enqueue_events_for_all_sites every 2 minutes (default).
  27. Specify scheduler_interval in seconds in common_site_config.json'''
  28. interval = frappe.get_conf().scheduler_interval or 120
  29. schedule.every(interval).seconds.do(enqueue_events_for_all_sites)
  30. while True:
  31. schedule.run_pending()
  32. time.sleep(1)
  33. def enqueue_events_for_all_sites():
  34. '''Loop through sites and enqueue events that are not already queued'''
  35. with frappe.init_site():
  36. jobs_per_site = get_jobs()
  37. sites = get_sites()
  38. for site in sites:
  39. try:
  40. enqueue_events_for_site(site=site, queued_jobs=jobs_per_site[site])
  41. except:
  42. # it should try to enqueue other sites
  43. print frappe.get_traceback()
  44. def enqueue_events_for_site(site, queued_jobs):
  45. try:
  46. frappe.init(site=site)
  47. if frappe.local.conf.maintenance_mode:
  48. return
  49. if frappe.local.conf.pause_scheduler:
  50. return
  51. frappe.connect()
  52. if is_scheduler_disabled():
  53. return
  54. enqueue_events(site=site, queued_jobs=queued_jobs)
  55. # TODO this print call is a tempfix till logging is fixed!
  56. print 'Queued events for site {0}'.format(site)
  57. frappe.logger(__name__).debug('Queued events for site {0}'.format(site))
  58. except:
  59. frappe.logger(__name__).error('Exception in Enqueue Events for Site {0}'.format(site))
  60. raise
  61. finally:
  62. frappe.destroy()
  63. def enqueue_events(site, queued_jobs):
  64. nowtime = frappe.utils.now_datetime()
  65. last = frappe.db.get_value('System Settings', 'System Settings', 'scheduler_last_event')
  66. # set scheduler last event
  67. frappe.db.begin()
  68. frappe.db.set_value('System Settings', 'System Settings',
  69. 'scheduler_last_event', nowtime.strftime(DATETIME_FORMAT),
  70. update_modified=False)
  71. frappe.db.commit()
  72. out = []
  73. if last:
  74. last = datetime.strptime(last, DATETIME_FORMAT)
  75. out = enqueue_applicable_events(site, nowtime, last, queued_jobs)
  76. return '\n'.join(out)
  77. def enqueue_applicable_events(site, nowtime, last, queued_jobs=()):
  78. nowtime_str = nowtime.strftime(DATETIME_FORMAT)
  79. out = []
  80. enabled_events = get_enabled_scheduler_events()
  81. def trigger_if_enabled(site, event):
  82. if event in enabled_events:
  83. trigger(site, event, queued_jobs)
  84. _log(event)
  85. def _log(event):
  86. out.append("{time} - {event} - queued".format(time=nowtime_str, event=event))
  87. if nowtime.day != last.day:
  88. # if first task of the day execute daily tasks
  89. trigger_if_enabled(site, "daily")
  90. trigger_if_enabled(site, "daily_long")
  91. if nowtime.month != last.month:
  92. trigger_if_enabled(site, "monthly")
  93. trigger_if_enabled(site, "monthly_long")
  94. if nowtime.weekday()==0:
  95. trigger_if_enabled(site, "weekly")
  96. trigger_if_enabled(site, "weekly_long")
  97. if "all" not in enabled_events:
  98. trigger(site, "all", queued_jobs)
  99. if "hourly" not in enabled_events:
  100. trigger(site, "hourly", queued_jobs)
  101. if nowtime.hour != last.hour:
  102. trigger_if_enabled(site, "hourly")
  103. trigger_if_enabled(site, "hourly_long")
  104. trigger_if_enabled(site, "all")
  105. return out
  106. def trigger(site, event, queued_jobs=(), now=False):
  107. """trigger method in hooks.scheduler_events"""
  108. queue = 'long' if event.endswith('_long') else 'short'
  109. timeout = queue_timeout[queue]
  110. if not queued_jobs and not now:
  111. queued_jobs = get_jobs(site=site, queue=queue)
  112. for handler in frappe.get_hooks("scheduler_events").get(event, []):
  113. if not now:
  114. if handler not in queued_jobs:
  115. enqueue(handler, queue, timeout, event)
  116. else:
  117. scheduler_task(site=site, event=event, handler=handler, now=True)
  118. if frappe.flags.in_test:
  119. frappe.flags.ran_schedulers.append(event)
  120. def log(method, message=None):
  121. """log error in patch_log"""
  122. message = frappe.utils.cstr(message) + "\n" if message else ""
  123. message += frappe.get_traceback()
  124. if not (frappe.db and frappe.db._conn):
  125. frappe.connect()
  126. frappe.db.rollback()
  127. frappe.db.begin()
  128. d = frappe.new_doc("Scheduler Log")
  129. d.method = method
  130. d.error = message
  131. d.insert(ignore_permissions=True)
  132. frappe.db.commit()
  133. return message
  134. def get_enabled_scheduler_events():
  135. enabled_events = frappe.db.get_global("enabled_scheduler_events")
  136. if enabled_events:
  137. return json.loads(enabled_events)
  138. return ["all", "hourly", "hourly_long", "daily", "daily_long",
  139. "weekly", "weekly_long", "monthly", "monthly_long"]
  140. def is_scheduler_disabled():
  141. if frappe.conf.disable_scheduler:
  142. return True
  143. return not frappe.utils.cint(frappe.db.get_single_value("System Settings", "enable_scheduler"))
  144. def toggle_scheduler(enable):
  145. ss = frappe.get_doc("System Settings")
  146. ss.enable_scheduler = 1 if enable else 0
  147. ss.flags.ignore_mandatory = True
  148. ss.save()
  149. def enable_scheduler():
  150. toggle_scheduler(True)
  151. def disable_scheduler():
  152. toggle_scheduler(False)
  153. def get_errors(from_date, to_date, limit):
  154. errors = frappe.db.sql("""select modified, method, error from `tabScheduler Log`
  155. where date(modified) between %s and %s
  156. and error not like '%%[Errno 110] Connection timed out%%'
  157. order by modified limit %s""", (from_date, to_date, limit), as_dict=True)
  158. return ["""<p>Time: {modified}</p><pre><code>Method: {method}\n{error}</code></pre>""".format(**e)
  159. for e in errors]
  160. def get_error_report(from_date=None, to_date=None, limit=10):
  161. from frappe.utils import get_url, now_datetime, add_days
  162. if not from_date:
  163. from_date = add_days(now_datetime().date(), -1)
  164. if not to_date:
  165. to_date = add_days(now_datetime().date(), -1)
  166. errors = get_errors(from_date, to_date, limit)
  167. if errors:
  168. return 1, """<h4>Scheduler Failed Events (max {limit}):</h4>
  169. <p>URL: <a href="{url}" target="_blank">{url}</a></p><hr>{errors}""".format(
  170. limit=limit, url=get_url(), errors="<hr>".join(errors))
  171. else:
  172. return 0, "<p>Scheduler didn't encounter any problems.</p>"
  173. def scheduler_task(site, event, handler, now=False):
  174. '''This is a wrapper function that runs a hooks.scheduler_events method'''
  175. frappe.logger(__name__).info('running {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
  176. try:
  177. if not now:
  178. frappe.connect(site=site)
  179. frappe.flags.in_scheduler = True
  180. frappe.get_attr(handler)()
  181. except Exception:
  182. frappe.db.rollback()
  183. traceback = log(handler, "Method: {event}, Handler: {handler}".format(event=event, handler=handler))
  184. frappe.logger(__name__).error(traceback)
  185. raise
  186. else:
  187. frappe.db.commit()
  188. frappe.logger(__name__).info('ran {handler} for {site} for event: {event}'.format(handler=handler, site=site, event=event))
  189. def reset_enabled_scheduler_events(login_manager):
  190. if login_manager.info.user_type == "System User":
  191. try:
  192. frappe.db.set_global('enabled_scheduler_events', None)
  193. except MySQLdb.OperationalError, e:
  194. if e.args[0]==1205:
  195. frappe.get_logger().error("Error in reset_enabled_scheduler_events")
  196. else:
  197. raise
  198. else:
  199. is_dormant = frappe.conf.get('dormant')
  200. if is_dormant:
  201. update_site_config('dormant', 'None')
  202. def disable_scheduler_on_expiry():
  203. if has_expired():
  204. disable_scheduler()
  205. def restrict_scheduler_events_if_dormant():
  206. if is_dormant():
  207. restrict_scheduler_events()
  208. update_site_config('dormant', True)
  209. def restrict_scheduler_events(*args, **kwargs):
  210. val = json.dumps(["daily", "daily_long", "weekly", "weekly_long", "monthly", "monthly_long"])
  211. frappe.db.set_global('enabled_scheduler_events', val)
  212. def is_dormant(since = 345600):
  213. last_active = get_datetime(get_last_active())
  214. # Get now without tz info
  215. now = now_datetime().replace(tzinfo=None)
  216. time_since_last_active = now - last_active
  217. if time_since_last_active.total_seconds() > since: # 4 days
  218. return True
  219. return False
  220. def get_last_active():
  221. return frappe.db.sql("""select max(ifnull(last_active, "2000-01-01 00:00:00")) from `tabUser`
  222. where user_type = 'System User' and name not in ({standard_users})"""\
  223. .format(standard_users=", ".join(["%s"]*len(STANDARD_USERS))),
  224. STANDARD_USERS)[0][0]