Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

10 роки тому
10 роки тому
9 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
9 роки тому
10 роки тому
10 роки тому
10 роки тому
9 роки тому
9 роки тому
10 роки тому
8 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
9 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
10 роки тому
9 роки тому
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # -*- coding: utf-8 -*-
  2. # Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and contributors
  3. # For license information, please see license.txt
  4. from __future__ import unicode_literals
  5. import frappe
  6. import os
  7. import time
  8. import redis
  9. from frappe.utils import get_site_path
  10. from frappe import conf
  11. END_LINE = '<!-- frappe: end-file -->'
  12. TASK_LOG_MAX_AGE = 86400 # 1 day in seconds
  13. redis_server = None
  14. @frappe.whitelist()
  15. def get_pending_tasks_for_doc(doctype, docname):
  16. return frappe.db.sql_list("select name from `tabAsync Task` where status in ('Queued', 'Running') and reference_doctype=%s and reference_name=%s", (doctype, docname))
  17. def set_task_status(task_id, status, response=None):
  18. if not response:
  19. response = {}
  20. response.update({
  21. "status": status,
  22. "task_id": task_id
  23. })
  24. emit_via_redis("task_status_change", response, room="task:" + task_id)
  25. def remove_old_task_logs():
  26. logs_path = get_site_path('task-logs')
  27. def full_path(_file):
  28. return os.path.join(logs_path, _file)
  29. files_to_remove = [full_path(_file) for _file in os.listdir(logs_path)]
  30. files_to_remove = [_file for _file in files_to_remove if is_file_old(_file) and os.path.isfile(_file)]
  31. for _file in files_to_remove:
  32. os.remove(_file)
  33. def is_file_old(file_path):
  34. return ((time.time() - os.stat(file_path).st_mtime) > TASK_LOG_MAX_AGE)
  35. def publish_progress(percent, title=None, doctype=None, docname=None):
  36. publish_realtime('progress', {'percent': percent, 'title': title},
  37. user=frappe.session.user, doctype=doctype, docname=docname)
  38. def publish_realtime(event=None, message=None, room=None,
  39. user=None, doctype=None, docname=None, task_id=None,
  40. after_commit=False):
  41. """Publish real-time updates
  42. :param event: Event name, like `task_progress` etc. that will be handled by the client (default is `task_progress` if within task or `global`)
  43. :param message: JSON message object. For async must contain `task_id`
  44. :param room: Room in which to publish update (default entire site)
  45. :param user: Transmit to user
  46. :param doctype: Transmit to doctype, docname
  47. :param docname: Transmit to doctype, docname
  48. :param after_commit: (default False) will emit after current transaction is committed"""
  49. if message is None:
  50. message = {}
  51. if event is None:
  52. if getattr(frappe.local, "task_id", None):
  53. event = "task_progress"
  54. else:
  55. event = "global"
  56. if event=='msgprint' and not user:
  57. user = frappe.session.user
  58. if not room:
  59. if not task_id and hasattr(frappe.local, "task_id"):
  60. task_id = frappe.local.task_id
  61. if task_id:
  62. room = get_task_progress_room(task_id)
  63. if not "task_id" in message:
  64. message["task_id"] = task_id
  65. after_commit = False
  66. elif user:
  67. room = get_user_room(user)
  68. elif doctype and docname:
  69. room = get_doc_room(doctype, docname)
  70. else:
  71. room = get_site_room()
  72. if after_commit:
  73. params = [event, message, room]
  74. if not params in frappe.local.realtime_log:
  75. frappe.local.realtime_log.append(params)
  76. else:
  77. emit_via_redis(event, message, room)
  78. def emit_via_redis(event, message, room):
  79. """Publish real-time updates via redis
  80. :param event: Event name, like `task_progress` etc.
  81. :param message: JSON message object. For async must contain `task_id`
  82. :param room: name of the room"""
  83. r = get_redis_server()
  84. try:
  85. r.publish('events', frappe.as_json({'event': event, 'message': message, 'room': room}))
  86. except redis.exceptions.ConnectionError:
  87. # print frappe.get_traceback()
  88. pass
  89. def put_log(line_no, line, task_id=None):
  90. r = get_redis_server()
  91. if not task_id:
  92. task_id = frappe.local.task_id
  93. task_progress_room = get_task_progress_room(task_id)
  94. task_log_key = "task_log:" + task_id
  95. publish_realtime('task_progress', {
  96. "message": {
  97. "lines": {line_no: line}
  98. },
  99. "task_id": task_id
  100. }, room=task_progress_room)
  101. r.hset(task_log_key, line_no, line)
  102. r.expire(task_log_key, 3600)
  103. def get_redis_server():
  104. """returns redis_socketio connection."""
  105. global redis_server
  106. if not redis_server:
  107. from redis import Redis
  108. redis_server = Redis.from_url(conf.get("redis_socketio")
  109. or "redis://localhost:12311")
  110. return redis_server
  111. class FileAndRedisStream(file):
  112. def __init__(self, *args, **kwargs):
  113. ret = super(FileAndRedisStream, self).__init__(*args, **kwargs)
  114. self.count = 0
  115. return ret
  116. def write(self, data):
  117. ret = super(FileAndRedisStream, self).write(data)
  118. if frappe.local.task_id:
  119. put_log(self.count, data, task_id=frappe.local.task_id)
  120. self.count += 1
  121. return ret
  122. def get_std_streams(task_id):
  123. stdout = FileAndRedisStream(get_task_log_file_path(task_id, 'stdout'), 'w')
  124. # stderr = FileAndRedisStream(get_task_log_file_path(task_id, 'stderr'), 'w')
  125. return stdout, stdout
  126. def get_task_log_file_path(task_id, stream_type):
  127. logs_dir = frappe.utils.get_site_path('task-logs')
  128. return os.path.join(logs_dir, task_id + '.' + stream_type)
  129. @frappe.whitelist(allow_guest=True)
  130. def can_subscribe_doc(doctype, docname, sid):
  131. from frappe.sessions import Session
  132. from frappe.exceptions import PermissionError
  133. session = Session(None, resume=True).get_session_data()
  134. if not frappe.has_permission(user=session.user, doctype=doctype, doc=docname, ptype='read'):
  135. raise PermissionError()
  136. return True
  137. @frappe.whitelist(allow_guest=True)
  138. def get_user_info(sid):
  139. from frappe.sessions import Session
  140. session = Session(None, resume=True).get_session_data()
  141. return {
  142. 'user': session.user,
  143. }
  144. def get_doc_room(doctype, docname):
  145. return ''.join([frappe.local.site, ':doc:', doctype, '/', docname])
  146. def get_user_room(user):
  147. return ''.join([frappe.local.site, ':user:', user])
  148. def get_site_room():
  149. return ''.join([frappe.local.site, ':all'])
  150. def get_task_progress_room(task_id):
  151. return "".join([frappe.local.site, ":task_progress:", task_id])