You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

socketio.js 7.2 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. var app = require('express')();
  2. var http = require('http').Server(app);
  3. var io = require('socket.io')(http);
  4. var cookie = require('cookie')
  5. var fs = require('fs');
  6. var redis = require("redis");
  7. var request = require('superagent');
  8. var conf = get_conf();
  9. var flags = {};
  10. var subscriber = redis.createClient(conf.redis_socketio || conf.redis_async_broker_port);
  11. // serve socketio
  12. http.listen(conf.socketio_port, function() {
  13. console.log('listening on *:', conf.socketio_port); //eslint-disable-line
  14. });
  15. // test route
  16. app.get('/', function(req, res) {
  17. res.sendfile('index.html');
  18. });
  19. // on socket connection
  20. io.on('connection', function(socket){
  21. if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
  22. return;
  23. }
  24. // console.log("connection!");
  25. var sid = cookie.parse(socket.request.headers.cookie).sid
  26. if(!sid) {
  27. return;
  28. }
  29. if(flags[sid]) {
  30. // throttle this function
  31. return;
  32. }
  33. flags[sid] = sid;
  34. setTimeout(function() { flags[sid] = null; }, 10000);
  35. socket.user = cookie.parse(socket.request.headers.cookie).user_id;
  36. // console.log("firing get_user_info");
  37. request.get(get_url(socket, '/api/method/frappe.async.get_user_info'))
  38. .type('form')
  39. .query({
  40. sid: sid
  41. })
  42. .end(function(err, res) {
  43. if(err) {
  44. console.log(err);
  45. return;
  46. }
  47. if(res.status == 200) {
  48. var room = get_user_room(socket, res.body.message.user);
  49. // console.log('joining', room);
  50. socket.join(room);
  51. socket.join(get_site_room(socket));
  52. }
  53. });
  54. socket.on('task_subscribe', function(task_id) {
  55. var room = get_task_room(socket, task_id);
  56. socket.join(room);
  57. });
  58. socket.on('task_unsubscribe', function(task_id) {
  59. var room = get_task_room(socket, task_id);
  60. socket.leave(room);
  61. });
  62. socket.on('progress_subscribe', function(task_id) {
  63. var room = get_task_room(socket, task_id);
  64. socket.join(room);
  65. send_existing_lines(task_id, socket);
  66. });
  67. socket.on('doc_subscribe', function(doctype, docname) {
  68. // console.log('trying to subscribe', doctype, docname)
  69. can_subscribe_doc({
  70. socket: socket,
  71. sid: sid,
  72. doctype: doctype,
  73. docname: docname,
  74. callback: function(err, res) {
  75. var room = get_doc_room(socket, doctype, docname);
  76. // console.log('joining', room)
  77. socket.join(room);
  78. }
  79. });
  80. });
  81. socket.on('doc_unsubscribe', function(doctype, docname) {
  82. var room = get_doc_room(socket, doctype, docname);
  83. socket.leave(room);
  84. });
  85. socket.on('task_unsubscribe', function(task_id) {
  86. var room = 'task:' + task_id;
  87. socket.leave(room);
  88. });
  89. socket.on('doc_open', function(doctype, docname) {
  90. // show who is currently viewing the form
  91. can_subscribe_doc({
  92. socket: socket,
  93. sid: sid,
  94. doctype: doctype,
  95. docname: docname,
  96. callback: function(err, res) {
  97. var room = get_open_doc_room(socket, doctype, docname);
  98. // console.log('joining', room)
  99. socket.join(room);
  100. send_viewers({
  101. socket: socket,
  102. doctype: doctype,
  103. docname: docname,
  104. });
  105. }
  106. });
  107. });
  108. socket.on('doc_close', function(doctype, docname) {
  109. // remove this user from the list of 'who is currently viewing the form'
  110. var room = get_open_doc_room(socket, doctype, docname);
  111. socket.leave(room);
  112. send_viewers({
  113. socket: socket,
  114. doctype: doctype,
  115. docname: docname,
  116. });
  117. });
  118. // socket.on('disconnect', function (arguments) {
  119. // console.log("user disconnected", arguments);
  120. // });
  121. });
  122. subscriber.on("message", function(channel, message) {
  123. message = JSON.parse(message);
  124. io.to(message.room).emit(message.event, message.message);
  125. // console.log(message.room, message.event, message.message)
  126. });
  127. subscriber.subscribe("events");
  128. function send_existing_lines(task_id, socket) {
  129. var room = get_task_room(socket, task_id);
  130. subscriber.hgetall('task_log:' + task_id, function(err, lines) {
  131. io.to(room).emit('task_progress', {
  132. "task_id": task_id,
  133. "message": {
  134. "lines": lines
  135. }
  136. });
  137. });
  138. }
  139. function get_doc_room(socket, doctype, docname) {
  140. return get_site_name(socket) + ':doc:'+ doctype + '/' + docname;
  141. }
  142. function get_open_doc_room(socket, doctype, docname) {
  143. return get_site_name(socket) + ':open_doc:'+ doctype + '/' + docname;
  144. }
  145. function get_user_room(socket, user) {
  146. return get_site_name(socket) + ':user:' + user;
  147. }
  148. function get_site_room(socket) {
  149. return get_site_name(socket) + ':all';
  150. }
  151. function get_task_room(socket, task_id) {
  152. return get_site_name(socket) + ':task_progress:' + task_id;
  153. }
  154. function get_site_name(socket) {
  155. if (socket.request.headers['x-frappe-site-name']) {
  156. return get_hostname(socket.request.headers['x-frappe-site-name']);
  157. }
  158. else if (['localhost', '127.0.0.1'].indexOf(socket.request.headers.host) !== -1
  159. && conf.default_site) {
  160. // from currentsite.txt since host is localhost
  161. return conf.default_site;
  162. }
  163. else if (socket.request.headers.origin) {
  164. return get_hostname(socket.request.headers.origin);
  165. }
  166. else {
  167. return get_hostname(socket.request.headers.host);
  168. }
  169. }
  170. function get_hostname(url) {
  171. if (!url) return undefined;
  172. if (url.indexOf("://") > -1) {
  173. url = url.split('/')[2];
  174. }
  175. return ( url.match(/:/g) ) ? url.slice( 0, url.indexOf(":") ) : url
  176. }
  177. function get_url(socket, path) {
  178. if (!path) {
  179. path = '';
  180. }
  181. return socket.request.headers.origin + path;
  182. }
  183. function can_subscribe_doc(args) {
  184. if(!args) return;
  185. if(!args.doctype || !args.docname) return;
  186. request.get(get_url(args.socket, '/api/method/frappe.async.can_subscribe_doc'))
  187. .type('form')
  188. .query({
  189. sid: args.sid,
  190. doctype: args.doctype,
  191. docname: args.docname
  192. })
  193. .end(function(err, res) {
  194. if (!res) {
  195. console.log("No response for doc_subscribe");
  196. } else if (res.status == 403) {
  197. return;
  198. } else if (err) {
  199. console.log(err);
  200. } else if (res.status == 200) {
  201. args.callback(err, res);
  202. } else {
  203. console.log("Something went wrong", err, res);
  204. }
  205. });
  206. }
  207. function send_viewers(args) {
  208. // send to doc room, 'users currently viewing this document'
  209. if (!(args && args.doctype && args.docname)) {
  210. return;
  211. }
  212. // open doc room
  213. var room = get_open_doc_room(args.socket, args.doctype, args.docname);
  214. var socketio_room = io.sockets.adapter.rooms[room] || {};
  215. // for compatibility with both v1.3.7 and 1.4.4
  216. var clients_dict = ("sockets" in socketio_room) ? socketio_room.sockets : socketio_room;
  217. // socket ids connected to this room
  218. var clients = Object.keys(clients_dict || {});
  219. var viewers = [];
  220. for (var i in io.sockets.sockets) {
  221. var s = io.sockets.sockets[i];
  222. if (clients.indexOf(s.id)!==-1) {
  223. // this socket is connected to the room
  224. viewers.push(s.user);
  225. }
  226. }
  227. // notify
  228. io.to(room).emit("doc_viewers", {
  229. doctype: args.doctype,
  230. docname: args.docname,
  231. viewers: viewers
  232. });
  233. }
  234. function get_conf() {
  235. // defaults
  236. var conf = {
  237. redis_async_broker_port: 12311,
  238. socketio_port: 3000
  239. };
  240. var read_config = function(path) {
  241. if(fs.existsSync(path)){
  242. var bench_config = JSON.parse(fs.readFileSync(path));
  243. for (var key in bench_config) {
  244. if (bench_config[key]) {
  245. conf[key] = bench_config[key];
  246. }
  247. }
  248. }
  249. }
  250. // get ports from bench/config.json
  251. read_config('config.json');
  252. read_config('sites/common_site_config.json');
  253. // detect current site
  254. if(fs.existsSync('sites/currentsite.txt')) {
  255. conf.default_site = fs.readFileSync('sites/currentsite.txt').toString().trim();
  256. }
  257. return conf;
  258. }