|
- var app = require('express')();
- var http = require('http').Server(app);
- var io = require('socket.io')(http);
- var cookie = require('cookie')
- var fs = require('fs');
- var redis = require("redis");
- var request = require('superagent');
-
- var conf = get_conf();
- var subscriber = redis.createClient(conf.redis_socketio || conf.redis_async_broker_port);
-
- // serve socketio
- http.listen(conf.socketio_port, function(){
- console.log('listening on *:', conf.socketio_port);
- });
-
- // test route
- app.get('/', function(req, res){
- res.sendfile('index.html');
- });
-
- // on socket connection
- io.on('connection', function(socket){
- if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
- return;
- }
-
- // console.log("connection!");
- var sid = cookie.parse(socket.request.headers.cookie).sid
- if(!sid) {
- return;
- }
-
- socket.user = cookie.parse(socket.request.headers.cookie).user_id;
-
- // console.log("firing get_user_info");
- request.get(get_url(socket, '/api/method/frappe.async.get_user_info'))
- .type('form')
- .send({
- sid: sid
- })
- .end(function(err, res) {
- if(err) {
- console.log(err);
- return;
- }
- if(res.status == 200) {
- var room = get_user_room(socket, res.body.message.user);
- // console.log('joining', room);
- socket.join(room);
- socket.join(get_site_room(socket));
- }
- });
-
- socket.on('task_subscribe', function(task_id) {
- var room = get_task_room(socket, task_id);
- socket.join(room);
- });
-
- socket.on('task_unsubscribe', function(task_id) {
- var room = get_task_room(socket, task_id);
- socket.leave(room);
- });
-
- socket.on('progress_subscribe', function(task_id) {
- var room = get_task_room(socket, task_id);
- socket.join(room);
- send_existing_lines(task_id, socket);
- });
-
- socket.on('doc_subscribe', function(doctype, docname) {
- // console.log('trying to subscribe', doctype, docname)
- can_subscribe_doc({
- socket: socket,
- sid: sid,
- doctype: doctype,
- docname: docname,
- callback: function(err, res) {
- var room = get_doc_room(socket, doctype, docname);
- // console.log('joining', room)
- socket.join(room);
- }
- });
- });
-
- socket.on('doc_unsubscribe', function(doctype, docname) {
- var room = get_doc_room(socket, doctype, docname);
- socket.leave(room);
- });
-
- socket.on('task_unsubscribe', function(task_id) {
- var room = 'task:' + task_id;
- socket.leave(room);
- });
-
- socket.on('doc_open', function(doctype, docname) {
- // show who is currently viewing the form
- can_subscribe_doc({
- socket: socket,
- sid: sid,
- doctype: doctype,
- docname: docname,
- callback: function(err, res) {
- var room = get_open_doc_room(socket, doctype, docname);
- // console.log('joining', room)
- socket.join(room);
-
- send_viewers({
- socket: socket,
- doctype: doctype,
- docname: docname,
- });
- }
- });
- });
-
- socket.on('doc_close', function(doctype, docname) {
- // remove this user from the list of 'who is currently viewing the form'
- var room = get_open_doc_room(socket, doctype, docname);
- socket.leave(room);
- send_viewers({
- socket: socket,
- doctype: doctype,
- docname: docname,
- });
- });
-
- // socket.on('disconnect', function (arguments) {
- // console.log("user disconnected", arguments);
- // });
- });
-
- subscriber.on("message", function(channel, message) {
- message = JSON.parse(message);
- io.to(message.room).emit(message.event, message.message);
- // console.log(message.room, message.event, message.message)
- });
-
- subscriber.subscribe("events");
-
- function send_existing_lines(task_id, socket) {
- var room = get_task_room(socket, task_id);
- subscriber.hgetall('task_log:' + task_id, function(err, lines) {
- io.to(room).emit('task_progress', {
- "task_id": task_id,
- "message": {
- "lines": lines
- }
- });
- });
- }
-
- function get_doc_room(socket, doctype, docname) {
- return get_site_name(socket) + ':doc:'+ doctype + '/' + docname;
- }
-
- function get_open_doc_room(socket, doctype, docname) {
- return get_site_name(socket) + ':open_doc:'+ doctype + '/' + docname;
- }
-
- function get_user_room(socket, user) {
- return get_site_name(socket) + ':user:' + user;
- }
-
- function get_site_room(socket) {
- return get_site_name(socket) + ':all';
- }
-
- function get_task_room(socket, task_id) {
- return get_site_name(socket) + ':task_progress:' + task_id;
- }
-
- function get_site_name(socket) {
- if (conf.default_site) {
- return conf.default_site;
- }
- else if (socket.request.headers['x-frappe-site-name']) {
- return get_hostname(socket.request.headers['x-frappe-site-name']);
- }
- else if (socket.request.headers.origin) {
- return get_hostname(socket.request.headers.origin);
- }
- else {
- return get_hostname(socket.request.headers.host);
- }
- }
-
- function get_hostname(url) {
- if (!url) return undefined;
- if (url.indexOf("://") > -1) {
- url = url.split('/')[2];
- }
- return ( url.match(/:/g) ) ? url.slice( 0, url.indexOf(":") ) : url
- }
-
- function get_url(socket, path) {
- if (!path) {
- path = '';
- }
- return socket.request.headers.origin + path;
- }
-
- function can_subscribe_doc(args) {
- request.get(get_url(args.socket, '/api/method/frappe.async.can_subscribe_doc'))
- .type('form')
- .send({
- sid: args.sid,
- doctype: args.doctype,
- docname: args.docname
- })
- .end(function(err, res) {
- if (!res) {
- console.log("No response for doc_subscribe");
-
- } else if (res.status == 403) {
- return;
-
- } else if (err) {
- console.log(err);
-
- } else if (res.status == 200) {
- args.callback(err, res);
-
- } else {
- console.log("Something went wrong", err, res);
- }
- });
- }
-
- function send_viewers(args) {
- // send to doc room, 'users currently viewing this document'
- if (!(args && args.doctype && args.docname)) {
- return;
- }
-
- // open doc room
- var room = get_open_doc_room(args.socket, args.doctype, args.docname);
-
- var socketio_room = io.sockets.adapter.rooms[room] || {};
-
- // for compatibility with both v1.3.7 and 1.4.4
- var clients_dict = ("sockets" in socketio_room) ? socketio_room.sockets : socketio_room;
-
- // socket ids connected to this room
- var clients = Object.keys(clients_dict || {});
-
- var viewers = [];
- for (var i in io.sockets.sockets) {
- var s = io.sockets.sockets[i];
- if (clients.indexOf(s.id)!==-1) {
- // this socket is connected to the room
- viewers.push(s.user);
- }
- }
-
- // notify
- io.to(room).emit("doc_viewers", {
- doctype: args.doctype,
- docname: args.docname,
- viewers: viewers
- });
- }
-
- function get_conf() {
- // defaults
- var conf = {
- redis_async_broker_port: 12311,
- socketio_port: 3000
- };
-
- var read_config = function(path) {
- if(fs.existsSync(path)){
- var bench_config = JSON.parse(fs.readFileSync(path));
- for (var key in bench_config) {
- if (bench_config[key]) {
- conf[key] = bench_config[key];
- }
- }
- }
- }
-
- // get ports from bench/config.json
- read_config('config.json');
- read_config('sites/common_site_config.json');
-
- // detect current site
- if(fs.existsSync('sites/currentsite.txt')) {
- conf.default_site = fs.readFileSync('sites/currentsite.txt').toString().trim();
- }
-
- return conf;
- }
|