diff --git a/bin/stunneld.js b/bin/stunneld.js index e6e4eb1..0b47c5f 100755 --- a/bin/stunneld.js +++ b/bin/stunneld.js @@ -183,7 +183,7 @@ require('../handlers').create(program); // adds directly to program for now... var wss = new WebSocketServer({ server: (program.httpTunnelServer || program.httpServer) }); wss.on('connection', netConnHandlers.ws); program.ports.forEach(function (port) { - var tcp3000 = net.createServer({ allowHalfOpen: true }); + var tcp3000 = net.createServer(); tcp3000.listen(port, function () { console.log('listening on ' + port); }); diff --git a/package.json b/package.json index 7c85863..d1a505e 100644 --- a/package.json +++ b/package.json @@ -46,6 +46,7 @@ }, "homepage": "https://github.com/Daplie/node-tunnel-server#readme", "dependencies": { + "bluebird": "^3.5.0", "cluster-store": "^2.0.4", "commander": "^2.9.0", "greenlock": "^2.1.12", diff --git a/wstunneld.js b/wstunneld.js index d09a6a2..6190ecc 100644 --- a/wstunneld.js +++ b/wstunneld.js @@ -2,9 +2,16 @@ var sni = require('sni'); var url = require('url'); +var PromiseA = require('bluebird'); var jwt = require('jsonwebtoken'); var packer = require('tunnel-packer'); +function timeoutPromise(duration) { + return new PromiseA(function (resolve) { + setTimeout(resolve, duration); + }); +} + var Devices = {}; Devices.add = function (store, servername, newDevice) { var devices = store[servername] || []; @@ -91,6 +98,51 @@ module.exports.create = function (copts) { return browserConn; } + function closeBrowserConn(cid) { + var remote; + Object.keys(remotes).some(function (jwtoken) { + if (remotes[jwtoken].clients[cid]) { + remote = remotes[jwtoken]; + return true; + } + }); + if (!remote) { + return; + } + + PromiseA.resolve().then(function () { + var conn = remote.clients[cid]; + conn.tunnelClosing = true; + conn.end(); + + // If no data is buffered for writing then we don't need to wait for it to drain. + if (!conn.bufferSize) { + return timeoutPromise(500); + } + // Otherwise we want the connection to be able to finish, but we also want to impose + // a time limit for it to drain, since it shouldn't have more than 1MB buffered. + return new PromiseA(function (resolve) { + var timeoutId = setTimeout(resolve, 60*1000); + conn.once('drain', function () { + clearTimeout(timeoutId); + setTimeout(resolve, 500); + }); + }); + }).then(function () { + if (remote.clients[cid]) { + console.warn(cid, 'browser connection still present after calling `end`'); + remote.clients[cid].destroy(); + return timeoutPromise(500); + } + }).then(function () { + if (remote.clients[cid]) { + console.error(cid, 'browser connection still present after calling `destroy`'); + delete remote.clients[cid]; + } + }).catch(function (err) { + console.warn('failed to close browser connection', cid, err); + }); + } function addToken(jwtoken) { if (remotes[jwtoken]) { @@ -132,6 +184,7 @@ module.exports.create = function (copts) { ws._socket.on('drain', function () { token.pausedConns.forEach(function (conn) { if (!conn.manualPause) { + console.log('resuming', conn.tunnelCid, 'now that the web socket has caught up'); conn.resume(); } }); @@ -162,7 +215,7 @@ module.exports.create = function (copts) { // Close all of the existing browser connections associated with this websocket connection. Object.keys(remote.clients).forEach(function (cid) { - remote.clients[cid].end(); + closeBrowserConn(cid); }); delete remotes[jwtoken]; console.log("removed token '" + remote.deviceId + "' from websocket", socketId); @@ -258,10 +311,15 @@ module.exports.create = function (copts) { browserConn.write(opts.data); // If we have more than 1MB buffered data we need to tell the other side to slow down. // Once we've finished sending what we have we can tell the other side to keep going. - if (browserConn.bufferSize > 1024*1024) { + // If we've already sent the 'pause' message though don't send it again, because we're + // probably just dealing with data queued before our message got to them. + if (!browserConn.remotePaused && browserConn.bufferSize > 1024*1024) { sendTunnelMsg(opts, null, 'pause'); + browserConn.remotePaused = true; + browserConn.once('drain', function () { sendTunnelMsg(opts, null, 'resume'); + browserConn.remotePaused = false; }); } } @@ -292,18 +350,12 @@ module.exports.create = function (copts) { , onend: function (opts) { var cid = packer.addrToId(opts); console.log('[TunnelEnd]', cid); - var browserConn = getBrowserConn(cid); - if (browserConn) { - browserConn.end(); - } + closeBrowserConn(cid); } , onerror: function (opts) { var cid = packer.addrToId(opts); - console.log('[TunnelError]', cid); - var browserConn = getBrowserConn(cid); - if (browserConn) { - browserConn.destroy(); - } + console.log('[TunnelError]', cid, opts.message); + closeBrowserConn(cid); } }; var unpacker = packer.create(packerHandlers); @@ -377,23 +429,23 @@ module.exports.create = function (copts) { var browserAddr = packer.socketToAddr(conn); browserAddr.service = service; var cid = packer.addrToId(browserAddr); + conn.tunnelCid = cid; console.log('[pipeWs] browser is', cid, 'home-cloud is', packer.socketToId(remote.ws.upgradeReq.socket)); - var sentEnd = false; function sendWs(data, serviceOverride) { - if (remote.ws) { + if (remote.ws && !conn.tunnelClosing) { try { remote.ws.send(packer.pack(browserAddr, data, serviceOverride), { binary: true }); // If we can't send data over the websocket as fast as this connection can send it to us // (or there are a lot of connections trying to send over the same websocket) then we // need to pause the connection for a little. We pause all connections if any are paused // to make things more fair so a connection doesn't get stuck waiting for everyone else - // to finish because it got caught on the boundary. - if (!serviceOverride) { - if (remote.pausedConns.length || remote.ws.bufferedAmount > 16*1024*1024) { - conn.pause(); - remote.pausedConns.push(conn); - } + // to finish because it got caught on the boundary. Also if serviceOverride is set it + // means the connection is over, so no need to pause it. + if (!serviceOverride && (remote.pausedConns.length || remote.ws._socket.bufferSize > 1024*1024)) { + console.log('pausing', cid, 'to allow web socket to catch up'); + conn.pause(); + remote.pausedConns.push(conn); } } catch (err) { console.warn('[pipeWs] error sending websocket message', err); @@ -401,22 +453,6 @@ module.exports.create = function (copts) { } } - var trueEnd = conn.end; - conn.end = function () { - // delete the connection from the clients to make sure nothing more can be written, then - // call the actual end function to clost the write part of the connection. - delete remote.clients[cid]; - trueEnd.apply(conn, arguments); - - var timeoutId = setTimeout(function () { - console.warn('[pipeWs] browser connection', cid, 'still open 1 min after sending `end`'); - conn.destroy(); - }, 60*1000); - conn.on('close', function () { - clearTimeout(timeoutId); - }); - }; - remote.clients[cid] = conn; conn.on('data', function (chunk) { console.log('[pipeWs] data from browser to tunneler', chunk.byteLength); @@ -425,30 +461,10 @@ module.exports.create = function (copts) { conn.on('error', function (err) { console.warn('[pipeWs] browser connection error', err); }); - conn.on('end', function () { - if (!sentEnd) { - sendWs(null, 'end'); - sentEnd = true; - } - - // Only add timeout to make sure other side is eventually closed if it isn't already closed. - if (remote.clients[cid]) { - var timeoutId = setTimeout(function () { - console.warn('[pipeWs] browser connection', cid, 'still open 1 min after receiving `end`'); - conn.destroy(); - }, 60*1000); - conn.on('close', function () { - clearTimeout(timeoutId); - }); - } - }); conn.on('close', function (hadErr) { console.log('[pipeWs] browser connection closing'); + sendWs(null, hadErr ? 'error': 'end'); delete remote.clients[cid]; - if (!sentEnd) { - sendWs(null, hadErr ? 'error': 'end'); - sentEnd = true; - } }); }