You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

318 line
7.3 KiB

  1. const cookie = require("cookie");
  2. const request = require("superagent");
  3. const { get_conf, get_redis_subscriber } = require("./node_utils");
  4. const conf = get_conf();
  5. const log = console.log; // eslint-disable-line
  6. const subscriber = get_redis_subscriber();
  7. const io = require("socket.io")(conf.socketio_port, {
  8. cors: {
  9. // Should be fine since we are ensuring whether hostname and origin are same before adding setting listeners for s socket
  10. origin: true,
  11. credentials: true,
  12. },
  13. });
  14. // on socket connection
  15. io.on("connection", function (socket) {
  16. if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
  17. return;
  18. }
  19. if (!socket.request.headers.cookie) {
  20. return;
  21. }
  22. const sid = cookie.parse(socket.request.headers.cookie).sid;
  23. if (!sid) {
  24. return;
  25. }
  26. socket.user = cookie.parse(socket.request.headers.cookie).user_id;
  27. socket.on("task_subscribe", function (task_id) {
  28. var room = get_task_room(socket, task_id);
  29. socket.join(room);
  30. });
  31. let retries = 0;
  32. let join_user_room = () => {
  33. request
  34. .get(get_url(socket, "/api/method/influxframework.realtime.get_user_info"))
  35. .type("form")
  36. .query({
  37. sid: sid,
  38. })
  39. .then((res) => {
  40. const room = get_user_room(socket, res.body.message.user);
  41. socket.join(room);
  42. socket.join(get_site_room(socket));
  43. })
  44. .catch((e) => {
  45. if (e.code === "ECONNREFUSED" && retries < 5) {
  46. // retry after 1s
  47. retries += 1;
  48. return setTimeout(join_user_room, 1000);
  49. }
  50. log(`Unable to join user room. ${e}`);
  51. });
  52. };
  53. join_user_room();
  54. socket.on("task_unsubscribe", function (task_id) {
  55. var room = get_task_room(socket, task_id);
  56. socket.leave(room);
  57. });
  58. socket.on("task_unsubscribe", function (task_id) {
  59. var room = "task:" + task_id;
  60. socket.leave(room);
  61. });
  62. socket.on("progress_subscribe", function (task_id) {
  63. var room = get_task_room(socket, task_id);
  64. socket.join(room);
  65. send_existing_lines(task_id, socket);
  66. });
  67. socket.on("doc_subscribe", function (doctype, docname) {
  68. can_subscribe_doc({
  69. socket,
  70. sid,
  71. doctype,
  72. docname,
  73. callback: () => {
  74. var room = get_doc_room(socket, doctype, docname);
  75. socket.join(room);
  76. },
  77. });
  78. });
  79. socket.on("doc_unsubscribe", function (doctype, docname) {
  80. var room = get_doc_room(socket, doctype, docname);
  81. socket.leave(room);
  82. });
  83. socket.on("doc_open", function (doctype, docname) {
  84. can_subscribe_doc({
  85. socket,
  86. sid,
  87. doctype,
  88. docname,
  89. callback: () => {
  90. var room = get_open_doc_room(socket, doctype, docname);
  91. socket.join(room);
  92. // show who is currently viewing the form
  93. send_users(
  94. {
  95. socket: socket,
  96. doctype: doctype,
  97. docname: docname,
  98. },
  99. "view"
  100. );
  101. // show who is currently typing on the form
  102. send_users(
  103. {
  104. socket: socket,
  105. doctype: doctype,
  106. docname: docname,
  107. },
  108. "type"
  109. );
  110. },
  111. });
  112. });
  113. socket.on("doc_close", function (doctype, docname) {
  114. // remove this user from the list of 'who is currently viewing the form'
  115. var room = get_open_doc_room(socket, doctype, docname);
  116. socket.leave(room);
  117. send_users(
  118. {
  119. socket: socket,
  120. doctype: doctype,
  121. docname: docname,
  122. },
  123. "view"
  124. );
  125. });
  126. socket.on("doc_typing", function (doctype, docname) {
  127. // show users that are currently typing on the form
  128. const room = get_typing_room(socket, doctype, docname);
  129. socket.join(room);
  130. send_users(
  131. {
  132. socket: socket,
  133. doctype: doctype,
  134. docname: docname,
  135. },
  136. "type"
  137. );
  138. });
  139. socket.on("doc_typing_stopped", function (doctype, docname) {
  140. // remove this user from the list of users currently typing on the form'
  141. const room = get_typing_room(socket, doctype, docname);
  142. socket.leave(room);
  143. send_users(
  144. {
  145. socket: socket,
  146. doctype: doctype,
  147. docname: docname,
  148. },
  149. "type"
  150. );
  151. });
  152. socket.on("open_in_editor", (data) => {
  153. let s = get_redis_subscriber("redis_socketio");
  154. s.publish("open_in_editor", JSON.stringify(data));
  155. });
  156. });
  157. subscriber.on("message", function (_channel, message) {
  158. message = JSON.parse(message);
  159. if (message.room) {
  160. io.to(message.room).emit(message.event, message.message);
  161. } else {
  162. io.emit(message.event, message.message);
  163. }
  164. });
  165. subscriber.subscribe("events");
  166. function send_existing_lines(task_id, socket) {
  167. var room = get_task_room(socket, task_id);
  168. subscriber.hgetall("task_log:" + task_id, function (_err, lines) {
  169. io.to(room).emit("task_progress", {
  170. task_id: task_id,
  171. message: {
  172. lines: lines,
  173. },
  174. });
  175. });
  176. }
  177. function get_doc_room(socket, doctype, docname) {
  178. return get_site_name(socket) + ":doc:" + doctype + "/" + docname;
  179. }
  180. function get_open_doc_room(socket, doctype, docname) {
  181. return get_site_name(socket) + ":open_doc:" + doctype + "/" + docname;
  182. }
  183. function get_typing_room(socket, doctype, docname) {
  184. return get_site_name(socket) + ":typing:" + doctype + "/" + docname;
  185. }
  186. function get_user_room(socket, user) {
  187. return get_site_name(socket) + ":user:" + user;
  188. }
  189. function get_site_room(socket) {
  190. return get_site_name(socket) + ":all";
  191. }
  192. function get_task_room(socket, task_id) {
  193. return get_site_name(socket) + ":task_progress:" + task_id;
  194. }
  195. function get_site_name(socket) {
  196. var hostname_from_host = get_hostname(socket.request.headers.host);
  197. if (socket.request.headers["x-influxframework-site-name"]) {
  198. return get_hostname(socket.request.headers["x-influxframework-site-name"]);
  199. } else if (
  200. ["localhost", "127.0.0.1"].indexOf(hostname_from_host) !== -1 &&
  201. conf.default_site
  202. ) {
  203. // from currentsite.txt since host is localhost
  204. return conf.default_site;
  205. } else if (socket.request.headers.origin) {
  206. return get_hostname(socket.request.headers.origin);
  207. } else {
  208. return get_hostname(socket.request.headers.host);
  209. }
  210. }
  211. function get_hostname(url) {
  212. if (!url) return undefined;
  213. if (url.indexOf("://") > -1) {
  214. url = url.split("/")[2];
  215. }
  216. return url.match(/:/g) ? url.slice(0, url.indexOf(":")) : url;
  217. }
  218. function get_url(socket, path) {
  219. if (!path) {
  220. path = "";
  221. }
  222. return socket.request.headers.origin + path;
  223. }
  224. function can_subscribe_doc(args) {
  225. if (!args) return;
  226. if (!args.doctype || !args.docname) return;
  227. request
  228. .get(get_url(args.socket, "/api/method/influxframework.realtime.can_subscribe_doc"))
  229. .type("form")
  230. .query({
  231. sid: args.sid,
  232. doctype: args.doctype,
  233. docname: args.docname,
  234. })
  235. .end(function (err, res) {
  236. if (!res) {
  237. log("No response for doc_subscribe");
  238. } else if (res.status == 403) {
  239. return;
  240. } else if (err) {
  241. log(err);
  242. } else if (res.status == 200) {
  243. args.callback(err, res);
  244. } else {
  245. log("Something went wrong", err, res);
  246. }
  247. });
  248. }
  249. function send_users(args, action) {
  250. if (!(args && args.doctype && args.docname)) {
  251. return;
  252. }
  253. const open_doc_room = get_open_doc_room(args.socket, args.doctype, args.docname);
  254. const room =
  255. action == "view"
  256. ? open_doc_room
  257. : get_typing_room(args.socket, args.doctype, args.docname);
  258. const clients = Array.from(io.sockets.adapter.rooms.get(room) || []);
  259. let users = [];
  260. io.sockets.sockets.forEach((sock) => {
  261. if (clients.includes(sock.id)) {
  262. users.push(sock.user);
  263. }
  264. });
  265. const emit_event = action == "view" ? "doc_viewers" : "doc_typers";
  266. // notify
  267. io.to(open_doc_room).emit(emit_event, {
  268. doctype: args.doctype,
  269. docname: args.docname,
  270. users: Array.from(new Set(users)),
  271. });
  272. }