You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

320 rivejä
7.5 KiB

  1. const cookie = require('cookie');
  2. const request = require('superagent');
  3. const { get_conf, get_redis_subscriber } = require('./node_utils');
  4. const conf = get_conf();
  5. const log = console.log; // eslint-disable-line
  6. const subscriber = get_redis_subscriber();
  7. const io = require('socket.io')(conf.socketio_port, {
  8. cors: {
  9. // Should be fine since we are ensuring whether hostname and origin are same before adding setting listeners for s socket
  10. origin: true,
  11. credentials: true
  12. }
  13. });
  14. // on socket connection
  15. io.on('connection', function (socket) {
  16. if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
  17. return;
  18. }
  19. if (!socket.request.headers.cookie) {
  20. return;
  21. }
  22. const sid = cookie.parse(socket.request.headers.cookie).sid;
  23. if (!sid) {
  24. return;
  25. }
  26. socket.user = cookie.parse(socket.request.headers.cookie).user_id;
  27. socket.on('task_subscribe', function (task_id) {
  28. var room = get_task_room(socket, task_id);
  29. socket.join(room);
  30. });
  31. let retries = 0;
  32. let join_user_room = () => {
  33. request.get(get_url(socket, '/api/method/frappe.realtime.get_user_info'))
  34. .type('form')
  35. .query({
  36. sid: sid
  37. })
  38. .then(res => {
  39. const room = get_user_room(socket, res.body.message.user);
  40. socket.join(room);
  41. socket.join(get_site_room(socket));
  42. })
  43. .catch(e => {
  44. if (e.code === 'ECONNREFUSED' && retries < 5) {
  45. // retry after 1s
  46. retries += 1;
  47. return setTimeout(join_user_room, 1000);
  48. }
  49. log(`Unable to join user room. ${e}`);
  50. });
  51. };
  52. join_user_room();
  53. socket.on('task_unsubscribe', function (task_id) {
  54. var room = get_task_room(socket, task_id);
  55. socket.leave(room);
  56. });
  57. socket.on('task_unsubscribe', function (task_id) {
  58. var room = 'task:' + task_id;
  59. socket.leave(room);
  60. });
  61. socket.on('progress_subscribe', function (task_id) {
  62. var room = get_task_room(socket, task_id);
  63. socket.join(room);
  64. send_existing_lines(task_id, socket);
  65. });
  66. socket.on('doc_subscribe', function (doctype, docname) {
  67. can_subscribe_doc({
  68. socket,
  69. sid,
  70. doctype,
  71. docname,
  72. callback: () => {
  73. var room = get_doc_room(socket, doctype, docname);
  74. socket.join(room);
  75. }
  76. });
  77. });
  78. socket.on('doc_unsubscribe', function (doctype, docname) {
  79. var room = get_doc_room(socket, doctype, docname);
  80. socket.leave(room);
  81. });
  82. socket.on('doc_open', function (doctype, docname) {
  83. can_subscribe_doc({
  84. socket,
  85. sid,
  86. doctype,
  87. docname,
  88. callback: () => {
  89. var room = get_open_doc_room(socket, doctype, docname);
  90. socket.join(room);
  91. // show who is currently viewing the form
  92. send_users(
  93. {
  94. socket: socket,
  95. doctype: doctype,
  96. docname: docname,
  97. },
  98. 'view'
  99. );
  100. // show who is currently typing on the form
  101. send_users(
  102. {
  103. socket: socket,
  104. doctype: doctype,
  105. docname: docname,
  106. },
  107. 'type'
  108. );
  109. }
  110. });
  111. });
  112. socket.on('doc_close', function (doctype, docname) {
  113. // remove this user from the list of 'who is currently viewing the form'
  114. var room = get_open_doc_room(socket, doctype, docname);
  115. socket.leave(room);
  116. send_users(
  117. {
  118. socket: socket,
  119. doctype: doctype,
  120. docname: docname,
  121. },
  122. 'view'
  123. );
  124. });
  125. socket.on('doc_typing', function (doctype, docname) {
  126. // show users that are currently typing on the form
  127. const room = get_typing_room(socket, doctype, docname);
  128. socket.join(room);
  129. send_users(
  130. {
  131. socket: socket,
  132. doctype: doctype,
  133. docname: docname,
  134. },
  135. 'type'
  136. );
  137. });
  138. socket.on('doc_typing_stopped', function (doctype, docname) {
  139. // remove this user from the list of users currently typing on the form'
  140. const room = get_typing_room(socket, doctype, docname);
  141. socket.leave(room);
  142. send_users(
  143. {
  144. socket: socket,
  145. doctype: doctype,
  146. docname: docname,
  147. },
  148. 'type'
  149. );
  150. });
  151. socket.on('open_in_editor', (data) => {
  152. let s = get_redis_subscriber('redis_socketio');
  153. s.publish('open_in_editor', JSON.stringify(data));
  154. });
  155. });
  156. subscriber.on("message", function (_channel, message) {
  157. message = JSON.parse(message);
  158. if (message.room) {
  159. io.to(message.room).emit(message.event, message.message);
  160. } else {
  161. io.emit(message.event, message.message);
  162. }
  163. });
  164. subscriber.subscribe("events");
  165. function send_existing_lines(task_id, socket) {
  166. var room = get_task_room(socket, task_id);
  167. subscriber.hgetall('task_log:' + task_id, function (_err, lines) {
  168. io.to(room).emit('task_progress', {
  169. "task_id": task_id,
  170. "message": {
  171. "lines": lines
  172. }
  173. });
  174. });
  175. }
  176. function get_doc_room(socket, doctype, docname) {
  177. return get_site_name(socket) + ':doc:' + doctype + '/' + docname;
  178. }
  179. function get_open_doc_room(socket, doctype, docname) {
  180. return get_site_name(socket) + ':open_doc:' + doctype + '/' + docname;
  181. }
  182. function get_typing_room(socket, doctype, docname) {
  183. return get_site_name(socket) + ':typing:' + doctype + '/' + docname;
  184. }
  185. function get_user_room(socket, user) {
  186. return get_site_name(socket) + ':user:' + user;
  187. }
  188. function get_site_room(socket) {
  189. return get_site_name(socket) + ':all';
  190. }
  191. function get_task_room(socket, task_id) {
  192. return get_site_name(socket) + ':task_progress:' + task_id;
  193. }
  194. function get_site_name(socket) {
  195. var hostname_from_host = get_hostname(socket.request.headers.host);
  196. if (socket.request.headers['x-frappe-site-name']) {
  197. return get_hostname(socket.request.headers['x-frappe-site-name']);
  198. } else if (['localhost', '127.0.0.1'].indexOf(hostname_from_host) !== -1 &&
  199. conf.default_site) {
  200. // from currentsite.txt since host is localhost
  201. return conf.default_site;
  202. } else if (socket.request.headers.origin) {
  203. return get_hostname(socket.request.headers.origin);
  204. } else {
  205. return get_hostname(socket.request.headers.host);
  206. }
  207. }
  208. function get_hostname(url) {
  209. if (!url) return undefined;
  210. if (url.indexOf("://") > -1) {
  211. url = url.split('/')[2];
  212. }
  213. return (url.match(/:/g)) ? url.slice(0, url.indexOf(":")) : url;
  214. }
  215. function get_url(socket, path) {
  216. if (!path) {
  217. path = '';
  218. }
  219. return socket.request.headers.origin + path;
  220. }
  221. function can_subscribe_doc(args) {
  222. if (!args) return;
  223. if (!args.doctype || !args.docname) return;
  224. request.get(get_url(args.socket, '/api/method/frappe.realtime.can_subscribe_doc'))
  225. .type('form')
  226. .query({
  227. sid: args.sid,
  228. doctype: args.doctype,
  229. docname: args.docname
  230. })
  231. .end(function (err, res) {
  232. if (!res) {
  233. log("No response for doc_subscribe");
  234. } else if (res.status == 403) {
  235. return;
  236. } else if (err) {
  237. log(err);
  238. } else if (res.status == 200) {
  239. args.callback(err, res);
  240. } else {
  241. log("Something went wrong", err, res);
  242. }
  243. });
  244. }
  245. function send_users(args, action) {
  246. if (!(args && args.doctype && args.docname)) {
  247. return;
  248. }
  249. const open_doc_room = get_open_doc_room(args.socket, args.doctype, args.docname);
  250. const room = action == 'view' ? open_doc_room: get_typing_room(args.socket, args.doctype, args.docname);
  251. const socketio_room = io.sockets.adapter.rooms[room] || {};
  252. // for compatibility with both v1.3.7 and 1.4.4
  253. const clients_dict = ('sockets' in socketio_room) ? socketio_room.sockets : socketio_room;
  254. // socket ids connected to this room
  255. const clients = Object.keys(clients_dict || {});
  256. let users = [];
  257. for (let i in io.sockets.sockets) {
  258. const s = io.sockets.sockets[i];
  259. if (clients.indexOf(s.id) !== -1) {
  260. // this socket is connected to the room
  261. users.push(s.user);
  262. }
  263. }
  264. const emit_event = action == 'view' ? 'doc_viewers' : 'doc_typers';
  265. // notify
  266. io.to(open_doc_room).emit(emit_event, {
  267. doctype: args.doctype,
  268. docname: args.docname,
  269. users: Array.from(new Set(users))
  270. });
  271. }