|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317 |
- const cookie = require("cookie");
- const request = require("superagent");
-
- const { get_conf, get_redis_subscriber } = require("./node_utils");
- const conf = get_conf();
- const log = console.log; // eslint-disable-line
- const subscriber = get_redis_subscriber();
-
- const io = require("socket.io")(conf.socketio_port, {
- cors: {
- // Should be fine since we are ensuring whether hostname and origin are same before adding setting listeners for s socket
- origin: true,
- credentials: true,
- },
- });
-
- // on socket connection
- io.on("connection", function (socket) {
- if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
- return;
- }
-
- if (!socket.request.headers.cookie) {
- return;
- }
-
- const sid = cookie.parse(socket.request.headers.cookie).sid;
- if (!sid) {
- return;
- }
-
- socket.user = cookie.parse(socket.request.headers.cookie).user_id;
-
- socket.on("task_subscribe", function (task_id) {
- var room = get_task_room(socket, task_id);
- socket.join(room);
- });
-
- let retries = 0;
- let join_user_room = () => {
- request
- .get(get_url(socket, "/api/method/influxframework.realtime.get_user_info"))
- .type("form")
- .query({
- sid: sid,
- })
- .then((res) => {
- const room = get_user_room(socket, res.body.message.user);
- socket.join(room);
- socket.join(get_site_room(socket));
- })
- .catch((e) => {
- if (e.code === "ECONNREFUSED" && retries < 5) {
- // retry after 1s
- retries += 1;
- return setTimeout(join_user_room, 1000);
- }
- log(`Unable to join user room. ${e}`);
- });
- };
-
- join_user_room();
-
- socket.on("task_unsubscribe", function (task_id) {
- var room = get_task_room(socket, task_id);
- socket.leave(room);
- });
-
- socket.on("task_unsubscribe", function (task_id) {
- var room = "task:" + task_id;
- socket.leave(room);
- });
-
- socket.on("progress_subscribe", function (task_id) {
- var room = get_task_room(socket, task_id);
- socket.join(room);
- send_existing_lines(task_id, socket);
- });
-
- socket.on("doc_subscribe", function (doctype, docname) {
- can_subscribe_doc({
- socket,
- sid,
- doctype,
- docname,
- callback: () => {
- var room = get_doc_room(socket, doctype, docname);
- socket.join(room);
- },
- });
- });
-
- socket.on("doc_unsubscribe", function (doctype, docname) {
- var room = get_doc_room(socket, doctype, docname);
- socket.leave(room);
- });
-
- socket.on("doc_open", function (doctype, docname) {
- can_subscribe_doc({
- socket,
- sid,
- doctype,
- docname,
- callback: () => {
- var room = get_open_doc_room(socket, doctype, docname);
- socket.join(room);
-
- // show who is currently viewing the form
- send_users(
- {
- socket: socket,
- doctype: doctype,
- docname: docname,
- },
- "view"
- );
-
- // show who is currently typing on the form
- send_users(
- {
- socket: socket,
- doctype: doctype,
- docname: docname,
- },
- "type"
- );
- },
- });
- });
-
- socket.on("doc_close", function (doctype, docname) {
- // remove this user from the list of 'who is currently viewing the form'
- var room = get_open_doc_room(socket, doctype, docname);
- socket.leave(room);
- send_users(
- {
- socket: socket,
- doctype: doctype,
- docname: docname,
- },
- "view"
- );
- });
-
- socket.on("doc_typing", function (doctype, docname) {
- // show users that are currently typing on the form
- const room = get_typing_room(socket, doctype, docname);
- socket.join(room);
-
- send_users(
- {
- socket: socket,
- doctype: doctype,
- docname: docname,
- },
- "type"
- );
- });
-
- socket.on("doc_typing_stopped", function (doctype, docname) {
- // remove this user from the list of users currently typing on the form'
- const room = get_typing_room(socket, doctype, docname);
- socket.leave(room);
-
- send_users(
- {
- socket: socket,
- doctype: doctype,
- docname: docname,
- },
- "type"
- );
- });
-
- socket.on("open_in_editor", (data) => {
- let s = get_redis_subscriber("redis_socketio");
- s.publish("open_in_editor", JSON.stringify(data));
- });
- });
-
- subscriber.on("message", function (_channel, message) {
- message = JSON.parse(message);
-
- if (message.room) {
- io.to(message.room).emit(message.event, message.message);
- } else {
- io.emit(message.event, message.message);
- }
- });
-
- subscriber.subscribe("events");
-
- function send_existing_lines(task_id, socket) {
- var room = get_task_room(socket, task_id);
- subscriber.hgetall("task_log:" + task_id, function (_err, lines) {
- io.to(room).emit("task_progress", {
- task_id: task_id,
- message: {
- lines: lines,
- },
- });
- });
- }
-
- function get_doc_room(socket, doctype, docname) {
- return get_site_name(socket) + ":doc:" + doctype + "/" + docname;
- }
-
- function get_open_doc_room(socket, doctype, docname) {
- return get_site_name(socket) + ":open_doc:" + doctype + "/" + docname;
- }
-
- function get_typing_room(socket, doctype, docname) {
- return get_site_name(socket) + ":typing:" + doctype + "/" + docname;
- }
-
- function get_user_room(socket, user) {
- return get_site_name(socket) + ":user:" + user;
- }
-
- function get_site_room(socket) {
- return get_site_name(socket) + ":all";
- }
-
- function get_task_room(socket, task_id) {
- return get_site_name(socket) + ":task_progress:" + task_id;
- }
-
- function get_site_name(socket) {
- var hostname_from_host = get_hostname(socket.request.headers.host);
-
- if (socket.request.headers["x-influxframework-site-name"]) {
- return get_hostname(socket.request.headers["x-influxframework-site-name"]);
- } else if (
- ["localhost", "127.0.0.1"].indexOf(hostname_from_host) !== -1 &&
- conf.default_site
- ) {
- // from currentsite.txt since host is localhost
- return conf.default_site;
- } else if (socket.request.headers.origin) {
- return get_hostname(socket.request.headers.origin);
- } else {
- return get_hostname(socket.request.headers.host);
- }
- }
-
- function get_hostname(url) {
- if (!url) return undefined;
- if (url.indexOf("://") > -1) {
- url = url.split("/")[2];
- }
- return url.match(/:/g) ? url.slice(0, url.indexOf(":")) : url;
- }
-
- function get_url(socket, path) {
- if (!path) {
- path = "";
- }
- return socket.request.headers.origin + path;
- }
-
- function can_subscribe_doc(args) {
- if (!args) return;
- if (!args.doctype || !args.docname) return;
- request
- .get(get_url(args.socket, "/api/method/influxframework.realtime.can_subscribe_doc"))
- .type("form")
- .query({
- sid: args.sid,
- doctype: args.doctype,
- docname: args.docname,
- })
- .end(function (err, res) {
- if (!res) {
- log("No response for doc_subscribe");
- } else if (res.status == 403) {
- return;
- } else if (err) {
- log(err);
- } else if (res.status == 200) {
- args.callback(err, res);
- } else {
- log("Something went wrong", err, res);
- }
- });
- }
-
- function send_users(args, action) {
- if (!(args && args.doctype && args.docname)) {
- return;
- }
-
- const open_doc_room = get_open_doc_room(args.socket, args.doctype, args.docname);
-
- const room =
- action == "view"
- ? open_doc_room
- : get_typing_room(args.socket, args.doctype, args.docname);
-
- const clients = Array.from(io.sockets.adapter.rooms.get(room) || []);
-
- let users = [];
- io.sockets.sockets.forEach((sock) => {
- if (clients.includes(sock.id)) {
- users.push(sock.user);
- }
- });
-
- const emit_event = action == "view" ? "doc_viewers" : "doc_typers";
-
- // notify
- io.to(open_doc_room).emit(emit_event, {
- doctype: args.doctype,
- docname: args.docname,
- users: Array.from(new Set(users)),
- });
- }
|