You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

424 lines
10 KiB

  1. var app = require('express')();
  2. var server = require('http').Server(app);
  3. var io = require('socket.io')(server);
  4. var cookie = require('cookie')
  5. var fs = require('fs');
  6. var path = require('path');
  7. var redis = require("redis");
  8. var request = require('superagent');
  9. var conf = get_conf();
  10. var flags = {};
  11. var files_struct = {
  12. name: null,
  13. type: null,
  14. size: 0,
  15. data: [],
  16. slice: 0,
  17. site_name: null,
  18. is_private: 0
  19. };
  20. var subscriber = redis.createClient(conf.redis_socketio || conf.redis_async_broker_port);
  21. // serve socketio
  22. server.listen(conf.socketio_port, function () {
  23. console.log('listening on *:', conf.socketio_port); //eslint-disable-line
  24. });
  25. // test route
  26. app.get('/', function (req, res) {
  27. res.sendfile('index.html');
  28. });
  29. // on socket connection
  30. io.on('connection', function (socket) {
  31. if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
  32. return;
  33. }
  34. if (!socket.request.headers.cookie) {
  35. return;
  36. }
  37. var sid = cookie.parse(socket.request.headers.cookie).sid
  38. if (!sid) {
  39. return;
  40. }
  41. // firefox reconnects multiple times on boot, so allow a few
  42. // rapid reconnections
  43. if (flags[sid] && flags[sid] > 4) {
  44. // throttle this function
  45. return;
  46. } else {
  47. flags[sid] = 1;
  48. }
  49. flags[sid] += 1;
  50. setTimeout(function () {
  51. flags[sid] = null;
  52. }, 10000);
  53. socket.user = cookie.parse(socket.request.headers.cookie).user_id;
  54. socket.files = {};
  55. // frappe namespace (temporary till webpack comes in, we can then reuse).
  56. const frappe = { };
  57. frappe._ = { };
  58. frappe._.compact = function (arr) { return arr.filter(Boolean) }
  59. // frappe.model
  60. // Realtime Database Updates FTW!
  61. function get_model_room (doctype, name, field) {
  62. const site = get_site_name(socket)
  63. const room = frappe._.compact([site, doctype, name, field]).join(":")
  64. return room
  65. }
  66. socket.on('frappe.model:subscribe', function (params) {
  67. const doctype = params.doctype
  68. const name = params.name
  69. const field = params.field
  70. const room = get_model_room(doctype, name, field)
  71. console.log('frappe.model: Subscribing ' + socket.user + ' to room ' + room);
  72. socket.join(room);
  73. })
  74. // end frappe.model
  75. // frappe.chat
  76. socket.on("frappe.chat.room:subscribe", function (rooms) {
  77. if (!Array.isArray(rooms)) {
  78. rooms = [rooms];
  79. }
  80. for (var room of rooms) {
  81. console.log('frappe.chat: Subscribing ' + socket.user + ' to room ' + room);
  82. room = get_chat_room(socket, room);
  83. console.log('frappe.chat: Subscribing ' + socket.user + ' to event ' + room);
  84. socket.join(room);
  85. }
  86. });
  87. socket.on("frappe.chat.message:typing", function (data) {
  88. const user = data.user;
  89. const room = get_chat_room(socket, data.room);
  90. console.log('frappe.chat: Dispatching ' + user + ' typing to room ' + room);
  91. io.to(room).emit('frappe.chat.room:typing', {
  92. room: data.room,
  93. user: user
  94. });
  95. });
  96. // end frappe.chat
  97. request.get(get_url(socket, '/api/method/frappe.async.get_user_info'))
  98. .type('form')
  99. .query({
  100. sid: sid
  101. })
  102. .end(function (err, res) {
  103. if (err) {
  104. console.log(err);
  105. return;
  106. }
  107. if (res.status == 200) {
  108. var room = get_user_room(socket, res.body.message.user);
  109. socket.join(room);
  110. socket.join(get_site_room(socket));
  111. }
  112. });
  113. socket.on('disconnect', function () {
  114. delete socket.files;
  115. })
  116. socket.on('task_subscribe', function (task_id) {
  117. var room = get_task_room(socket, task_id);
  118. socket.join(room);
  119. });
  120. socket.on('task_unsubscribe', function (task_id) {
  121. var room = get_task_room(socket, task_id);
  122. socket.leave(room);
  123. });
  124. socket.on('progress_subscribe', function (task_id) {
  125. var room = get_task_room(socket, task_id);
  126. socket.join(room);
  127. send_existing_lines(task_id, socket);
  128. });
  129. socket.on('doc_subscribe', function (doctype, docname) {
  130. can_subscribe_doc({
  131. socket: socket,
  132. sid: sid,
  133. doctype: doctype,
  134. docname: docname,
  135. callback: function (err, res) {
  136. var room = get_doc_room(socket, doctype, docname);
  137. socket.join(room);
  138. }
  139. });
  140. });
  141. socket.on('doc_unsubscribe', function (doctype, docname) {
  142. var room = get_doc_room(socket, doctype, docname);
  143. socket.leave(room);
  144. });
  145. socket.on('task_unsubscribe', function (task_id) {
  146. var room = 'task:' + task_id;
  147. socket.leave(room);
  148. });
  149. socket.on('doc_open', function (doctype, docname) {
  150. // show who is currently viewing the form
  151. can_subscribe_doc({
  152. socket: socket,
  153. sid: sid,
  154. doctype: doctype,
  155. docname: docname,
  156. callback: function (err, res) {
  157. var room = get_open_doc_room(socket, doctype, docname);
  158. socket.join(room);
  159. send_viewers({
  160. socket: socket,
  161. doctype: doctype,
  162. docname: docname,
  163. });
  164. }
  165. });
  166. });
  167. socket.on('doc_close', function (doctype, docname) {
  168. // remove this user from the list of 'who is currently viewing the form'
  169. var room = get_open_doc_room(socket, doctype, docname);
  170. socket.leave(room);
  171. send_viewers({
  172. socket: socket,
  173. doctype: doctype,
  174. docname: docname,
  175. });
  176. });
  177. socket.on('upload-accept-slice', (data) => {
  178. try {
  179. if (!socket.files[data.name]) {
  180. socket.files[data.name] = Object.assign({}, files_struct, data);
  181. socket.files[data.name].data = [];
  182. }
  183. //convert the ArrayBuffer to Buffer
  184. data.data = new Buffer(new Uint8Array(data.data));
  185. //save the data
  186. socket.files[data.name].data.push(data.data);
  187. socket.files[data.name].slice++;
  188. if (socket.files[data.name].slice * 24576 >= socket.files[data.name].size) {
  189. // do something with the data
  190. var fileBuffer = Buffer.concat(socket.files[data.name].data);
  191. const file_url = path.join((socket.files[data.name].is_private ? 'private' : 'public'),
  192. 'files', data.name);
  193. const file_path = path.join('sites', get_site_name(socket), file_url);
  194. fs.writeFile(file_path, fileBuffer, (err) => {
  195. delete socket.files[data.name];
  196. if (err) return socket.emit('upload error');
  197. socket.emit('upload-end', {
  198. file_url: '/' + file_url
  199. });
  200. });
  201. } else {
  202. socket.emit('upload-request-slice', {
  203. currentSlice: socket.files[data.name].slice
  204. });
  205. }
  206. } catch (e) {
  207. console.log(e);
  208. socket.emit('upload-error', {
  209. error: e.message
  210. });
  211. }
  212. });
  213. });
  214. subscriber.on("message", function (channel, message, room) {
  215. message = JSON.parse(message)
  216. io.to(message.room).emit(message.event, message.message)
  217. });
  218. subscriber.subscribe("events");
  219. function send_existing_lines(task_id, socket) {
  220. var room = get_task_room(socket, task_id);
  221. subscriber.hgetall('task_log:' + task_id, function (err, lines) {
  222. io.to(room).emit('task_progress', {
  223. "task_id": task_id,
  224. "message": {
  225. "lines": lines
  226. }
  227. });
  228. });
  229. }
  230. function get_doc_room(socket, doctype, docname) {
  231. return get_site_name(socket) + ':doc:' + doctype + '/' + docname;
  232. }
  233. function get_open_doc_room(socket, doctype, docname) {
  234. return get_site_name(socket) + ':open_doc:' + doctype + '/' + docname;
  235. }
  236. function get_user_room(socket, user) {
  237. return get_site_name(socket) + ':user:' + user;
  238. }
  239. function get_site_room(socket) {
  240. return get_site_name(socket) + ':all';
  241. }
  242. function get_task_room(socket, task_id) {
  243. return get_site_name(socket) + ':task_progress:' + task_id;
  244. }
  245. // frappe.chat
  246. // If you're thinking on multi-site or anything, please
  247. // update frappe.async as well.
  248. function get_chat_room(socket, room) {
  249. var room = get_site_name(socket) + ":room:" + room;
  250. return room
  251. }
  252. function get_site_name(socket) {
  253. if (socket.request.headers['x-frappe-site-name']) {
  254. return get_hostname(socket.request.headers['x-frappe-site-name']);
  255. } else if (['localhost', '127.0.0.1'].indexOf(socket.request.headers.host) !== -1 &&
  256. conf.default_site) {
  257. // from currentsite.txt since host is localhost
  258. return conf.default_site;
  259. } else if (socket.request.headers.origin) {
  260. return get_hostname(socket.request.headers.origin);
  261. } else {
  262. return get_hostname(socket.request.headers.host);
  263. }
  264. }
  265. function get_hostname(url) {
  266. if (!url) return undefined;
  267. if (url.indexOf("://") > -1) {
  268. url = url.split('/')[2];
  269. }
  270. return (url.match(/:/g)) ? url.slice(0, url.indexOf(":")) : url
  271. }
  272. function get_url(socket, path) {
  273. if (!path) {
  274. path = '';
  275. }
  276. return socket.request.headers.origin + path;
  277. }
  278. function can_subscribe_doc(args) {
  279. if (!args) return;
  280. if (!args.doctype || !args.docname) return;
  281. request.get(get_url(args.socket, '/api/method/frappe.async.can_subscribe_doc'))
  282. .type('form')
  283. .query({
  284. sid: args.sid,
  285. doctype: args.doctype,
  286. docname: args.docname
  287. })
  288. .end(function (err, res) {
  289. if (!res) {
  290. console.log("No response for doc_subscribe");
  291. } else if (res.status == 403) {
  292. return;
  293. } else if (err) {
  294. console.log(err);
  295. } else if (res.status == 200) {
  296. args.callback(err, res);
  297. } else {
  298. console.log("Something went wrong", err, res);
  299. }
  300. });
  301. }
  302. function send_viewers(args) {
  303. // send to doc room, 'users currently viewing this document'
  304. if (!(args && args.doctype && args.docname)) {
  305. return;
  306. }
  307. // open doc room
  308. var room = get_open_doc_room(args.socket, args.doctype, args.docname);
  309. var socketio_room = io.sockets.adapter.rooms[room] || {};
  310. // for compatibility with both v1.3.7 and 1.4.4
  311. var clients_dict = ("sockets" in socketio_room) ? socketio_room.sockets : socketio_room;
  312. // socket ids connected to this room
  313. var clients = Object.keys(clients_dict || {});
  314. var viewers = [];
  315. for (var i in io.sockets.sockets) {
  316. var s = io.sockets.sockets[i];
  317. if (clients.indexOf(s.id) !== -1) {
  318. // this socket is connected to the room
  319. viewers.push(s.user);
  320. }
  321. }
  322. // notify
  323. io.to(room).emit("doc_viewers", {
  324. doctype: args.doctype,
  325. docname: args.docname,
  326. viewers: viewers
  327. });
  328. }
  329. function get_conf() {
  330. // defaults
  331. var conf = {
  332. redis_async_broker_port: 12311,
  333. socketio_port: 3000
  334. };
  335. var read_config = function (path) {
  336. if (fs.existsSync(path)) {
  337. var bench_config = JSON.parse(fs.readFileSync(path));
  338. for (var key in bench_config) {
  339. if (bench_config[key]) {
  340. conf[key] = bench_config[key];
  341. }
  342. }
  343. }
  344. }
  345. // get ports from bench/config.json
  346. read_config('config.json');
  347. read_config('sites/common_site_config.json');
  348. // detect current site
  349. if (fs.existsSync('sites/currentsite.txt')) {
  350. conf.default_site = fs.readFileSync('sites/currentsite.txt').toString().trim();
  351. }
  352. return conf;
  353. }