You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

360 lines
8.6 KiB

  1. var app = require('express')();
  2. var server = require('http').Server(app);
  3. var io = require('socket.io')(server);
  4. var cookie = require('cookie');
  5. var fs = require('fs');
  6. var path = require('path');
  7. var request = require('superagent');
  8. var { get_conf, get_redis_subscriber } = require('./node_utils');
  9. const log = console.log; // eslint-disable-line
  10. var conf = get_conf();
  11. var files_struct = {
  12. name: null,
  13. type: null,
  14. size: 0,
  15. data: [],
  16. slice: 0,
  17. site_name: null,
  18. is_private: 0
  19. };
  20. var subscriber = get_redis_subscriber();
  21. // serve socketio
  22. server.listen(conf.socketio_port, function () {
  23. log('listening on *:', conf.socketio_port); //eslint-disable-line
  24. });
  25. // on socket connection
  26. io.on('connection', function (socket) {
  27. if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
  28. return;
  29. }
  30. if (!socket.request.headers.cookie) {
  31. return;
  32. }
  33. const sid = cookie.parse(socket.request.headers.cookie).sid;
  34. if (!sid) {
  35. return;
  36. }
  37. socket.user = cookie.parse(socket.request.headers.cookie).user_id;
  38. socket.files = {};
  39. // frappe.chat
  40. socket.on("frappe.chat.room:subscribe", function (rooms) {
  41. if (!Array.isArray(rooms)) {
  42. rooms = [rooms];
  43. }
  44. for (var room of rooms) {
  45. log('frappe.chat: Subscribing ' + socket.user + ' to room ' + room);
  46. room = get_chat_room(socket, room);
  47. log('frappe.chat: Subscribing ' + socket.user + ' to event ' + room);
  48. socket.join(room);
  49. }
  50. });
  51. socket.on("frappe.chat.message:typing", function (data) {
  52. const user = data.user;
  53. const room = get_chat_room(socket, data.room);
  54. log('frappe.chat: Dispatching ' + user + ' typing to room ' + room);
  55. io.to(room).emit('frappe.chat.room:typing', {
  56. room: data.room,
  57. user: user
  58. });
  59. });
  60. // end frappe.chat
  61. let retries = 0;
  62. let join_chat_room = () => {
  63. request.get(get_url(socket, '/api/method/frappe.realtime.get_user_info'))
  64. .type('form')
  65. .query({
  66. sid: sid
  67. })
  68. .then(res => {
  69. const room = get_user_room(socket, res.body.message.user);
  70. socket.join(room);
  71. socket.join(get_site_room(socket));
  72. })
  73. .catch(e => {
  74. if (e.code === 'ECONNREFUSED' && retries < 5) {
  75. // retry after 1s
  76. retries += 1;
  77. return setTimeout(join_chat_room, 1000);
  78. }
  79. log(`Unable to join chat room. ${e}`);
  80. });
  81. };
  82. join_chat_room();
  83. socket.on('disconnect', function () {
  84. delete socket.files;
  85. });
  86. socket.on('task_subscribe', function (task_id) {
  87. var room = get_task_room(socket, task_id);
  88. socket.join(room);
  89. });
  90. socket.on('task_unsubscribe', function (task_id) {
  91. var room = get_task_room(socket, task_id);
  92. socket.leave(room);
  93. });
  94. socket.on('progress_subscribe', function (task_id) {
  95. var room = get_task_room(socket, task_id);
  96. socket.join(room);
  97. send_existing_lines(task_id, socket);
  98. });
  99. socket.on('doc_subscribe', function (doctype, docname) {
  100. can_subscribe_doc({
  101. socket,
  102. sid,
  103. doctype,
  104. docname,
  105. callback: () => {
  106. var room = get_doc_room(socket, doctype, docname);
  107. socket.join(room);
  108. }
  109. });
  110. });
  111. socket.on('doc_unsubscribe', function (doctype, docname) {
  112. var room = get_doc_room(socket, doctype, docname);
  113. socket.leave(room);
  114. });
  115. socket.on('task_unsubscribe', function (task_id) {
  116. var room = 'task:' + task_id;
  117. socket.leave(room);
  118. });
  119. socket.on('doc_open', function (doctype, docname) {
  120. // show who is currently viewing the form
  121. can_subscribe_doc({
  122. socket: socket,
  123. sid: sid,
  124. doctype: doctype,
  125. docname: docname,
  126. callback: () => {
  127. var room = get_open_doc_room(socket, doctype, docname);
  128. socket.join(room);
  129. send_viewers({
  130. socket: socket,
  131. doctype: doctype,
  132. docname: docname,
  133. });
  134. }
  135. });
  136. });
  137. socket.on('doc_close', function (doctype, docname) {
  138. // remove this user from the list of 'who is currently viewing the form'
  139. var room = get_open_doc_room(socket, doctype, docname);
  140. socket.leave(room);
  141. send_viewers({
  142. socket: socket,
  143. doctype: doctype,
  144. docname: docname,
  145. });
  146. });
  147. socket.on('upload-accept-slice', (data) => {
  148. try {
  149. if (!socket.files[data.name]) {
  150. socket.files[data.name] = Object.assign({}, files_struct, data);
  151. socket.files[data.name].data = [];
  152. }
  153. //convert the ArrayBuffer to Buffer
  154. data.data = new Buffer(new Uint8Array(data.data));
  155. //save the data
  156. socket.files[data.name].data.push(data.data);
  157. socket.files[data.name].slice++;
  158. if (socket.files[data.name].slice * 24576 >= socket.files[data.name].size) {
  159. // do something with the data
  160. var fileBuffer = Buffer.concat(socket.files[data.name].data);
  161. const file_url = path.join((socket.files[data.name].is_private ? 'private' : 'public'),
  162. 'files', data.name);
  163. const file_path = path.join('sites', get_site_name(socket), file_url);
  164. fs.writeFile(file_path, fileBuffer, (err) => {
  165. delete socket.files[data.name];
  166. if (err) return socket.emit('upload error');
  167. socket.emit('upload-end', {
  168. file_url: '/' + file_url
  169. });
  170. });
  171. } else {
  172. socket.emit('upload-request-slice', {
  173. currentSlice: socket.files[data.name].slice
  174. });
  175. }
  176. } catch (e) {
  177. log(e);
  178. socket.emit('upload-error', {
  179. error: e.message
  180. });
  181. }
  182. });
  183. });
  184. subscriber.on("message", function (_channel, message) {
  185. message = JSON.parse(message);
  186. if (message.room) {
  187. io.to(message.room).emit(message.event, message.message);
  188. } else {
  189. io.emit(message.event, message.message);
  190. }
  191. });
  192. subscriber.subscribe("events");
  193. function send_existing_lines(task_id, socket) {
  194. var room = get_task_room(socket, task_id);
  195. subscriber.hgetall('task_log:' + task_id, function (_err, lines) {
  196. io.to(room).emit('task_progress', {
  197. "task_id": task_id,
  198. "message": {
  199. "lines": lines
  200. }
  201. });
  202. });
  203. }
  204. function get_doc_room(socket, doctype, docname) {
  205. return get_site_name(socket) + ':doc:' + doctype + '/' + docname;
  206. }
  207. function get_open_doc_room(socket, doctype, docname) {
  208. return get_site_name(socket) + ':open_doc:' + doctype + '/' + docname;
  209. }
  210. function get_user_room(socket, user) {
  211. return get_site_name(socket) + ':user:' + user;
  212. }
  213. function get_site_room(socket) {
  214. return get_site_name(socket) + ':all';
  215. }
  216. function get_task_room(socket, task_id) {
  217. return get_site_name(socket) + ':task_progress:' + task_id;
  218. }
  219. // frappe.chat
  220. // If you're thinking on multi-site or anything, please
  221. // update frappe.async as well.
  222. function get_chat_room(socket, room) {
  223. var room = get_site_name(socket) + ":room:" + room;
  224. return room
  225. }
  226. function get_site_name(socket) {
  227. if (socket.request.headers['x-frappe-site-name']) {
  228. return get_hostname(socket.request.headers['x-frappe-site-name']);
  229. } else if (['localhost', '127.0.0.1'].indexOf(socket.request.headers.host) !== -1 &&
  230. conf.default_site) {
  231. // from currentsite.txt since host is localhost
  232. return conf.default_site;
  233. } else if (socket.request.headers.origin) {
  234. return get_hostname(socket.request.headers.origin);
  235. } else {
  236. return get_hostname(socket.request.headers.host);
  237. }
  238. }
  239. function get_hostname(url) {
  240. if (!url) return undefined;
  241. if (url.indexOf("://") > -1) {
  242. url = url.split('/')[2];
  243. }
  244. return (url.match(/:/g)) ? url.slice(0, url.indexOf(":")) : url
  245. }
  246. function get_url(socket, path) {
  247. if (!path) {
  248. path = '';
  249. }
  250. return socket.request.headers.origin + path;
  251. }
  252. function can_subscribe_doc(args) {
  253. if (!args) return;
  254. if (!args.doctype || !args.docname) return;
  255. request.get(get_url(args.socket, '/api/method/frappe.realtime.can_subscribe_doc'))
  256. .type('form')
  257. .query({
  258. sid: args.sid,
  259. doctype: args.doctype,
  260. docname: args.docname
  261. })
  262. .end(function (err, res) {
  263. if (!res) {
  264. log("No response for doc_subscribe");
  265. } else if (res.status == 403) {
  266. return;
  267. } else if (err) {
  268. log(err);
  269. } else if (res.status == 200) {
  270. args.callback(err, res);
  271. } else {
  272. log("Something went wrong", err, res);
  273. }
  274. });
  275. }
  276. function send_viewers(args) {
  277. // send to doc room, 'users currently viewing this document'
  278. if (!(args && args.doctype && args.docname)) {
  279. return;
  280. }
  281. // open doc room
  282. var room = get_open_doc_room(args.socket, args.doctype, args.docname);
  283. var socketio_room = io.sockets.adapter.rooms[room] || {};
  284. // for compatibility with both v1.3.7 and 1.4.4
  285. var clients_dict = ("sockets" in socketio_room) ? socketio_room.sockets : socketio_room;
  286. // socket ids connected to this room
  287. var clients = Object.keys(clients_dict || {});
  288. var viewers = [];
  289. for (var i in io.sockets.sockets) {
  290. var s = io.sockets.sockets[i];
  291. if (clients.indexOf(s.id) !== -1) {
  292. // this socket is connected to the room
  293. viewers.push(s.user);
  294. }
  295. }
  296. // notify
  297. io.to(room).emit("doc_viewers", {
  298. doctype: args.doctype,
  299. docname: args.docname,
  300. viewers: viewers
  301. });
  302. }