You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

243 lines
6.6 KiB

  1. from __future__ import unicode_literals, print_function
  2. import redis
  3. from rq import Connection, Queue, Worker
  4. from rq.logutils import setup_loghandlers
  5. from frappe.utils import cstr
  6. from collections import defaultdict
  7. import frappe
  8. import os, socket, time
  9. from frappe import _
  10. from six import string_types
  11. from uuid import uuid4
  12. import frappe.monitor
  13. # imports - third-party imports
  14. default_timeout = 300
  15. queue_timeout = {
  16. 'background': 2500,
  17. 'long': 1500,
  18. 'default': 300,
  19. 'short': 300
  20. }
  21. redis_connection = None
  22. def enqueue(method, queue='default', timeout=None, event=None,
  23. is_async=True, job_name=None, now=False, enqueue_after_commit=False, **kwargs):
  24. '''
  25. Enqueue method to be executed using a background worker
  26. :param method: method string or method object
  27. :param queue: should be either long, default or short
  28. :param timeout: should be set according to the functions
  29. :param event: this is passed to enable clearing of jobs from queues
  30. :param is_async: if is_async=False, the method is executed immediately, else via a worker
  31. :param job_name: can be used to name an enqueue call, which can be used to prevent duplicate calls
  32. :param now: if now=True, the method is executed via frappe.call
  33. :param kwargs: keyword arguments to be passed to the method
  34. '''
  35. # To handle older implementations
  36. is_async = kwargs.pop('async', is_async)
  37. if now or frappe.flags.in_migrate:
  38. return frappe.call(method, **kwargs)
  39. q = get_queue(queue, is_async=is_async)
  40. if not timeout:
  41. timeout = queue_timeout.get(queue) or 300
  42. queue_args = {
  43. "site": frappe.local.site,
  44. "user": frappe.session.user,
  45. "method": method,
  46. "event": event,
  47. "job_name": job_name or cstr(method),
  48. "is_async": is_async,
  49. "kwargs": kwargs
  50. }
  51. if enqueue_after_commit:
  52. if not frappe.flags.enqueue_after_commit:
  53. frappe.flags.enqueue_after_commit = []
  54. frappe.flags.enqueue_after_commit.append({
  55. "queue": queue,
  56. "is_async": is_async,
  57. "timeout": timeout,
  58. "queue_args":queue_args
  59. })
  60. return frappe.flags.enqueue_after_commit
  61. else:
  62. return q.enqueue_call(execute_job, timeout=timeout,
  63. kwargs=queue_args)
  64. def enqueue_doc(doctype, name=None, method=None, queue='default', timeout=300,
  65. now=False, **kwargs):
  66. '''Enqueue a method to be run on a document'''
  67. enqueue('frappe.utils.background_jobs.run_doc_method', doctype=doctype, name=name,
  68. doc_method=method, queue=queue, timeout=timeout, now=now, **kwargs)
  69. def run_doc_method(doctype, name, doc_method, **kwargs):
  70. getattr(frappe.get_doc(doctype, name), doc_method)(**kwargs)
  71. def execute_job(site, method, event, job_name, kwargs, user=None, is_async=True, retry=0):
  72. '''Executes job in a worker, performs commit/rollback and logs if there is any error'''
  73. if is_async:
  74. frappe.connect(site)
  75. if os.environ.get('CI'):
  76. frappe.flags.in_test = True
  77. if user:
  78. frappe.set_user(user)
  79. if isinstance(method, string_types):
  80. method_name = method
  81. method = frappe.get_attr(method)
  82. else:
  83. method_name = cstr(method.__name__)
  84. frappe.monitor.start("job", method_name, kwargs)
  85. try:
  86. method(**kwargs)
  87. except (frappe.db.InternalError, frappe.RetryBackgroundJobError) as e:
  88. frappe.db.rollback()
  89. if (retry < 5 and
  90. (isinstance(e, frappe.RetryBackgroundJobError) or
  91. (frappe.db.is_deadlocked(e) or frappe.db.is_timedout(e)))):
  92. # retry the job if
  93. # 1213 = deadlock
  94. # 1205 = lock wait timeout
  95. # or RetryBackgroundJobError is explicitly raised
  96. frappe.destroy()
  97. time.sleep(retry+1)
  98. return execute_job(site, method, event, job_name, kwargs,
  99. is_async=is_async, retry=retry+1)
  100. else:
  101. frappe.log_error(method_name)
  102. raise
  103. except:
  104. frappe.db.rollback()
  105. frappe.log_error(method_name)
  106. frappe.db.commit()
  107. print(frappe.get_traceback())
  108. raise
  109. else:
  110. frappe.db.commit()
  111. finally:
  112. frappe.monitor.stop()
  113. if is_async:
  114. frappe.destroy()
  115. def start_worker(queue=None, quiet = False):
  116. '''Wrapper to start rq worker. Connects to redis and monitors these queues.'''
  117. with frappe.init_site():
  118. # empty init is required to get redis_queue from common_site_config.json
  119. redis_connection = get_redis_conn()
  120. if os.environ.get('CI'):
  121. setup_loghandlers('ERROR')
  122. with Connection(redis_connection):
  123. queues = get_queue_list(queue)
  124. logging_level = "INFO"
  125. if quiet:
  126. logging_level = "WARNING"
  127. Worker(queues, name=get_worker_name(queue)).work(logging_level = logging_level)
  128. def get_worker_name(queue):
  129. '''When limiting worker to a specific queue, also append queue name to default worker name'''
  130. name = None
  131. if queue:
  132. # hostname.pid is the default worker name
  133. name = '{uuid}.{hostname}.{pid}.{queue}'.format(
  134. uuid=uuid4().hex,
  135. hostname=socket.gethostname(),
  136. pid=os.getpid(),
  137. queue=queue)
  138. return name
  139. def get_jobs(site=None, queue=None, key='method'):
  140. '''Gets jobs per queue or per site or both'''
  141. jobs_per_site = defaultdict(list)
  142. def add_to_dict(job):
  143. if key in job.kwargs:
  144. jobs_per_site[job.kwargs['site']].append(job.kwargs[key])
  145. elif key in job.kwargs.get('kwargs', {}):
  146. # optional keyword arguments are stored in 'kwargs' of 'kwargs'
  147. jobs_per_site[job.kwargs['site']].append(job.kwargs['kwargs'][key])
  148. for queue in get_queue_list(queue):
  149. q = get_queue(queue)
  150. for job in q.jobs:
  151. if job.kwargs.get('site'):
  152. if site is None:
  153. add_to_dict(job)
  154. elif job.kwargs['site'] == site:
  155. add_to_dict(job)
  156. else:
  157. print('No site found in job', job.__dict__)
  158. return jobs_per_site
  159. def get_queue_list(queue_list=None):
  160. '''Defines possible queues. Also wraps a given queue in a list after validating.'''
  161. default_queue_list = list(queue_timeout)
  162. if queue_list:
  163. if isinstance(queue_list, string_types):
  164. queue_list = [queue_list]
  165. for queue in queue_list:
  166. validate_queue(queue, default_queue_list)
  167. return queue_list
  168. else:
  169. return default_queue_list
  170. def get_queue(queue, is_async=True):
  171. '''Returns a Queue object tied to a redis connection'''
  172. validate_queue(queue)
  173. return Queue(queue, connection=get_redis_conn(), is_async=is_async)
  174. def validate_queue(queue, default_queue_list=None):
  175. if not default_queue_list:
  176. default_queue_list = list(queue_timeout)
  177. if queue not in default_queue_list:
  178. frappe.throw(_("Queue should be one of {0}").format(', '.join(default_queue_list)))
  179. def get_redis_conn():
  180. if not hasattr(frappe.local, 'conf'):
  181. raise Exception('You need to call frappe.init')
  182. elif not frappe.local.conf.redis_queue:
  183. raise Exception('redis_queue missing in common_site_config.json')
  184. global redis_connection
  185. if not redis_connection:
  186. redis_connection = redis.from_url(frappe.local.conf.redis_queue)
  187. return redis_connection
  188. def enqueue_test_job():
  189. enqueue('frappe.utils.background_jobs.test_job', s=100)
  190. def test_job(s):
  191. import time
  192. print('sleeping...')
  193. time.sleep(s)