您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

socketio.js 8.6 KiB

10 年前
10 年前
10 年前
10 年前
10 年前
10 年前
10 年前
10 年前
10 年前
10 年前
10 年前
10 年前
10 年前
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. var app = require('express')();
  2. var http = require('http').Server(app);
  3. var io = require('socket.io')(http);
  4. var cookie = require('cookie')
  5. var fs = require('fs');
  6. var path = require('path');
  7. var redis = require("redis");
  8. var request = require('superagent');
  9. var conf = get_conf();
  10. var flags = {};
  11. var files_struct = {
  12. name: null,
  13. type: null,
  14. size: 0,
  15. data: [],
  16. slice: 0,
  17. site_name: null,
  18. is_private: 0
  19. };
  20. var subscriber = redis.createClient(conf.redis_socketio || conf.redis_async_broker_port);
  21. // serve socketio
  22. http.listen(conf.socketio_port, function() {
  23. console.log('listening on *:', conf.socketio_port); //eslint-disable-line
  24. });
  25. // test route
  26. app.get('/', function(req, res) {
  27. res.sendfile('index.html');
  28. });
  29. // on socket connection
  30. io.on('connection', function(socket) {
  31. if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
  32. return;
  33. }
  34. // console.log("connection!");
  35. if (!socket.request.headers.cookie) {
  36. return;
  37. }
  38. var sid = cookie.parse(socket.request.headers.cookie).sid
  39. if(!sid) {
  40. return;
  41. }
  42. if(flags[sid]) {
  43. // throttle this function
  44. return;
  45. }
  46. flags[sid] = sid;
  47. setTimeout(function() { flags[sid] = null; }, 10000);
  48. socket.user = cookie.parse(socket.request.headers.cookie).user_id;
  49. socket.files = {};
  50. // console.log("firing get_user_info");
  51. request.get(get_url(socket, '/api/method/frappe.async.get_user_info'))
  52. .type('form')
  53. .query({
  54. sid: sid
  55. })
  56. .end(function(err, res) {
  57. if(err) {
  58. console.log(err);
  59. return;
  60. }
  61. if(res.status == 200) {
  62. var room = get_user_room(socket, res.body.message.user);
  63. // console.log('joining', room);
  64. socket.join(room);
  65. socket.join(get_site_room(socket));
  66. }
  67. });
  68. socket.on('disconnect', function() {
  69. delete socket.files;
  70. })
  71. socket.on('task_subscribe', function(task_id) {
  72. var room = get_task_room(socket, task_id);
  73. socket.join(room);
  74. });
  75. socket.on('task_unsubscribe', function(task_id) {
  76. var room = get_task_room(socket, task_id);
  77. socket.leave(room);
  78. });
  79. socket.on('progress_subscribe', function(task_id) {
  80. var room = get_task_room(socket, task_id);
  81. socket.join(room);
  82. send_existing_lines(task_id, socket);
  83. });
  84. socket.on('doc_subscribe', function(doctype, docname) {
  85. // console.log('trying to subscribe', doctype, docname)
  86. can_subscribe_doc({
  87. socket: socket,
  88. sid: sid,
  89. doctype: doctype,
  90. docname: docname,
  91. callback: function(err, res) {
  92. var room = get_doc_room(socket, doctype, docname);
  93. // console.log('joining', room)
  94. socket.join(room);
  95. }
  96. });
  97. });
  98. socket.on('doc_unsubscribe', function(doctype, docname) {
  99. var room = get_doc_room(socket, doctype, docname);
  100. socket.leave(room);
  101. });
  102. socket.on('task_unsubscribe', function(task_id) {
  103. var room = 'task:' + task_id;
  104. socket.leave(room);
  105. });
  106. socket.on('doc_open', function(doctype, docname) {
  107. // show who is currently viewing the form
  108. can_subscribe_doc({
  109. socket: socket,
  110. sid: sid,
  111. doctype: doctype,
  112. docname: docname,
  113. callback: function(err, res) {
  114. var room = get_open_doc_room(socket, doctype, docname);
  115. // console.log('joining', room)
  116. socket.join(room);
  117. send_viewers({
  118. socket: socket,
  119. doctype: doctype,
  120. docname: docname,
  121. });
  122. }
  123. });
  124. });
  125. socket.on('doc_close', function(doctype, docname) {
  126. // remove this user from the list of 'who is currently viewing the form'
  127. var room = get_open_doc_room(socket, doctype, docname);
  128. socket.leave(room);
  129. send_viewers({
  130. socket: socket,
  131. doctype: doctype,
  132. docname: docname,
  133. });
  134. });
  135. socket.on('upload-accept-slice', (data) => {
  136. try {
  137. if (!socket.files[data.name]) {
  138. socket.files[data.name] = Object.assign({}, files_struct, data);
  139. socket.files[data.name].data = [];
  140. }
  141. //convert the ArrayBuffer to Buffer
  142. data.data = new Buffer(new Uint8Array(data.data));
  143. //save the data
  144. socket.files[data.name].data.push(data.data);
  145. socket.files[data.name].slice++;
  146. if (socket.files[data.name].slice * 24576 >= socket.files[data.name].size) {
  147. // do something with the data
  148. var fileBuffer = Buffer.concat(socket.files[data.name].data);
  149. const file_url = path.join((socket.files[data.name].is_private ? 'private' : 'public'),
  150. 'files', data.name);
  151. const file_path = path.join('sites', get_site_name(socket), file_url);
  152. fs.writeFile(file_path, fileBuffer, (err) => {
  153. delete socket.files[data.name];
  154. if (err) return socket.emit('upload error');
  155. socket.emit('upload-end', {
  156. file_url: '/' + file_url
  157. });
  158. });
  159. } else {
  160. socket.emit('upload-request-slice', {
  161. currentSlice: socket.files[data.name].slice
  162. });
  163. }
  164. } catch (e) {
  165. console.log(e);
  166. socket.emit('upload-error', {
  167. error: e.message
  168. });
  169. }
  170. });
  171. });
  172. subscriber.on("message", function(channel, message) {
  173. message = JSON.parse(message);
  174. io.to(message.room).emit(message.event, message.message);
  175. // console.log(message.room, message.event, message.message)
  176. });
  177. subscriber.subscribe("events");
  178. function send_existing_lines(task_id, socket) {
  179. var room = get_task_room(socket, task_id);
  180. subscriber.hgetall('task_log:' + task_id, function(err, lines) {
  181. io.to(room).emit('task_progress', {
  182. "task_id": task_id,
  183. "message": {
  184. "lines": lines
  185. }
  186. });
  187. });
  188. }
  189. function get_doc_room(socket, doctype, docname) {
  190. return get_site_name(socket) + ':doc:'+ doctype + '/' + docname;
  191. }
  192. function get_open_doc_room(socket, doctype, docname) {
  193. return get_site_name(socket) + ':open_doc:'+ doctype + '/' + docname;
  194. }
  195. function get_user_room(socket, user) {
  196. return get_site_name(socket) + ':user:' + user;
  197. }
  198. function get_site_room(socket) {
  199. return get_site_name(socket) + ':all';
  200. }
  201. function get_task_room(socket, task_id) {
  202. return get_site_name(socket) + ':task_progress:' + task_id;
  203. }
  204. function get_site_name(socket) {
  205. if (socket.request.headers['x-frappe-site-name']) {
  206. return get_hostname(socket.request.headers['x-frappe-site-name']);
  207. }
  208. else if (['localhost', '127.0.0.1'].indexOf(socket.request.headers.host) !== -1
  209. && conf.default_site) {
  210. // from currentsite.txt since host is localhost
  211. return conf.default_site;
  212. }
  213. else if (socket.request.headers.origin) {
  214. return get_hostname(socket.request.headers.origin);
  215. }
  216. else {
  217. return get_hostname(socket.request.headers.host);
  218. }
  219. }
  220. function get_hostname(url) {
  221. if (!url) return undefined;
  222. if (url.indexOf("://") > -1) {
  223. url = url.split('/')[2];
  224. }
  225. return ( url.match(/:/g) ) ? url.slice( 0, url.indexOf(":") ) : url
  226. }
  227. function get_url(socket, path) {
  228. if (!path) {
  229. path = '';
  230. }
  231. return socket.request.headers.origin + path;
  232. }
  233. function can_subscribe_doc(args) {
  234. if(!args) return;
  235. if(!args.doctype || !args.docname) return;
  236. request.get(get_url(args.socket, '/api/method/frappe.async.can_subscribe_doc'))
  237. .type('form')
  238. .query({
  239. sid: args.sid,
  240. doctype: args.doctype,
  241. docname: args.docname
  242. })
  243. .end(function(err, res) {
  244. if (!res) {
  245. console.log("No response for doc_subscribe");
  246. } else if (res.status == 403) {
  247. return;
  248. } else if (err) {
  249. console.log(err);
  250. } else if (res.status == 200) {
  251. args.callback(err, res);
  252. } else {
  253. console.log("Something went wrong", err, res);
  254. }
  255. });
  256. }
  257. function send_viewers(args) {
  258. // send to doc room, 'users currently viewing this document'
  259. if (!(args && args.doctype && args.docname)) {
  260. return;
  261. }
  262. // open doc room
  263. var room = get_open_doc_room(args.socket, args.doctype, args.docname);
  264. var socketio_room = io.sockets.adapter.rooms[room] || {};
  265. // for compatibility with both v1.3.7 and 1.4.4
  266. var clients_dict = ("sockets" in socketio_room) ? socketio_room.sockets : socketio_room;
  267. // socket ids connected to this room
  268. var clients = Object.keys(clients_dict || {});
  269. var viewers = [];
  270. for (var i in io.sockets.sockets) {
  271. var s = io.sockets.sockets[i];
  272. if (clients.indexOf(s.id)!==-1) {
  273. // this socket is connected to the room
  274. viewers.push(s.user);
  275. }
  276. }
  277. // notify
  278. io.to(room).emit("doc_viewers", {
  279. doctype: args.doctype,
  280. docname: args.docname,
  281. viewers: viewers
  282. });
  283. }
  284. function get_conf() {
  285. // defaults
  286. var conf = {
  287. redis_async_broker_port: 12311,
  288. socketio_port: 3000
  289. };
  290. var read_config = function(path) {
  291. if(fs.existsSync(path)){
  292. var bench_config = JSON.parse(fs.readFileSync(path));
  293. for (var key in bench_config) {
  294. if (bench_config[key]) {
  295. conf[key] = bench_config[key];
  296. }
  297. }
  298. }
  299. }
  300. // get ports from bench/config.json
  301. read_config('config.json');
  302. read_config('sites/common_site_config.json');
  303. // detect current site
  304. if(fs.existsSync('sites/currentsite.txt')) {
  305. conf.default_site = fs.readFileSync('sites/currentsite.txt').toString().trim();
  306. }
  307. return conf;
  308. }