went back to not allowing half-open connections

This commit is contained in:
tigerbot 2017-09-08 11:24:30 -06:00
parent 96ab344b71
commit a8be74d77e
3 changed files with 74 additions and 57 deletions

View File

@ -183,7 +183,7 @@ require('../handlers').create(program); // adds directly to program for now...
var wss = new WebSocketServer({ server: (program.httpTunnelServer || program.httpServer) }); var wss = new WebSocketServer({ server: (program.httpTunnelServer || program.httpServer) });
wss.on('connection', netConnHandlers.ws); wss.on('connection', netConnHandlers.ws);
program.ports.forEach(function (port) { program.ports.forEach(function (port) {
var tcp3000 = net.createServer({ allowHalfOpen: true }); var tcp3000 = net.createServer();
tcp3000.listen(port, function () { tcp3000.listen(port, function () {
console.log('listening on ' + port); console.log('listening on ' + port);
}); });

View File

@ -46,6 +46,7 @@
}, },
"homepage": "https://github.com/Daplie/node-tunnel-server#readme", "homepage": "https://github.com/Daplie/node-tunnel-server#readme",
"dependencies": { "dependencies": {
"bluebird": "^3.5.0",
"cluster-store": "^2.0.4", "cluster-store": "^2.0.4",
"commander": "^2.9.0", "commander": "^2.9.0",
"greenlock": "^2.1.12", "greenlock": "^2.1.12",

View File

@ -2,9 +2,16 @@
var sni = require('sni'); var sni = require('sni');
var url = require('url'); var url = require('url');
var PromiseA = require('bluebird');
var jwt = require('jsonwebtoken'); var jwt = require('jsonwebtoken');
var packer = require('tunnel-packer'); var packer = require('tunnel-packer');
function timeoutPromise(duration) {
return new PromiseA(function (resolve) {
setTimeout(resolve, duration);
});
}
var Devices = {}; var Devices = {};
Devices.add = function (store, servername, newDevice) { Devices.add = function (store, servername, newDevice) {
var devices = store[servername] || []; var devices = store[servername] || [];
@ -91,6 +98,51 @@ module.exports.create = function (copts) {
return browserConn; return browserConn;
} }
function closeBrowserConn(cid) {
var remote;
Object.keys(remotes).some(function (jwtoken) {
if (remotes[jwtoken].clients[cid]) {
remote = remotes[jwtoken];
return true;
}
});
if (!remote) {
return;
}
PromiseA.resolve().then(function () {
var conn = remote.clients[cid];
conn.tunnelClosing = true;
conn.end();
// If no data is buffered for writing then we don't need to wait for it to drain.
if (!conn.bufferSize) {
return timeoutPromise(500);
}
// Otherwise we want the connection to be able to finish, but we also want to impose
// a time limit for it to drain, since it shouldn't have more than 1MB buffered.
return new PromiseA(function (resolve) {
var timeoutId = setTimeout(resolve, 60*1000);
conn.once('drain', function () {
clearTimeout(timeoutId);
setTimeout(resolve, 500);
});
});
}).then(function () {
if (remote.clients[cid]) {
console.warn(cid, 'browser connection still present after calling `end`');
remote.clients[cid].destroy();
return timeoutPromise(500);
}
}).then(function () {
if (remote.clients[cid]) {
console.error(cid, 'browser connection still present after calling `destroy`');
delete remote.clients[cid];
}
}).catch(function (err) {
console.warn('failed to close browser connection', cid, err);
});
}
function addToken(jwtoken) { function addToken(jwtoken) {
if (remotes[jwtoken]) { if (remotes[jwtoken]) {
@ -132,6 +184,7 @@ module.exports.create = function (copts) {
ws._socket.on('drain', function () { ws._socket.on('drain', function () {
token.pausedConns.forEach(function (conn) { token.pausedConns.forEach(function (conn) {
if (!conn.manualPause) { if (!conn.manualPause) {
console.log('resuming', conn.tunnelCid, 'now that the web socket has caught up');
conn.resume(); conn.resume();
} }
}); });
@ -162,7 +215,7 @@ module.exports.create = function (copts) {
// Close all of the existing browser connections associated with this websocket connection. // Close all of the existing browser connections associated with this websocket connection.
Object.keys(remote.clients).forEach(function (cid) { Object.keys(remote.clients).forEach(function (cid) {
remote.clients[cid].end(); closeBrowserConn(cid);
}); });
delete remotes[jwtoken]; delete remotes[jwtoken];
console.log("removed token '" + remote.deviceId + "' from websocket", socketId); console.log("removed token '" + remote.deviceId + "' from websocket", socketId);
@ -258,10 +311,15 @@ module.exports.create = function (copts) {
browserConn.write(opts.data); browserConn.write(opts.data);
// If we have more than 1MB buffered data we need to tell the other side to slow down. // If we have more than 1MB buffered data we need to tell the other side to slow down.
// Once we've finished sending what we have we can tell the other side to keep going. // Once we've finished sending what we have we can tell the other side to keep going.
if (browserConn.bufferSize > 1024*1024) { // If we've already sent the 'pause' message though don't send it again, because we're
// probably just dealing with data queued before our message got to them.
if (!browserConn.remotePaused && browserConn.bufferSize > 1024*1024) {
sendTunnelMsg(opts, null, 'pause'); sendTunnelMsg(opts, null, 'pause');
browserConn.remotePaused = true;
browserConn.once('drain', function () { browserConn.once('drain', function () {
sendTunnelMsg(opts, null, 'resume'); sendTunnelMsg(opts, null, 'resume');
browserConn.remotePaused = false;
}); });
} }
} }
@ -292,18 +350,12 @@ module.exports.create = function (copts) {
, onend: function (opts) { , onend: function (opts) {
var cid = packer.addrToId(opts); var cid = packer.addrToId(opts);
console.log('[TunnelEnd]', cid); console.log('[TunnelEnd]', cid);
var browserConn = getBrowserConn(cid); closeBrowserConn(cid);
if (browserConn) {
browserConn.end();
}
} }
, onerror: function (opts) { , onerror: function (opts) {
var cid = packer.addrToId(opts); var cid = packer.addrToId(opts);
console.log('[TunnelError]', cid); console.log('[TunnelError]', cid, opts.message);
var browserConn = getBrowserConn(cid); closeBrowserConn(cid);
if (browserConn) {
browserConn.destroy();
}
} }
}; };
var unpacker = packer.create(packerHandlers); var unpacker = packer.create(packerHandlers);
@ -377,46 +429,30 @@ module.exports.create = function (copts) {
var browserAddr = packer.socketToAddr(conn); var browserAddr = packer.socketToAddr(conn);
browserAddr.service = service; browserAddr.service = service;
var cid = packer.addrToId(browserAddr); var cid = packer.addrToId(browserAddr);
conn.tunnelCid = cid;
console.log('[pipeWs] browser is', cid, 'home-cloud is', packer.socketToId(remote.ws.upgradeReq.socket)); console.log('[pipeWs] browser is', cid, 'home-cloud is', packer.socketToId(remote.ws.upgradeReq.socket));
var sentEnd = false;
function sendWs(data, serviceOverride) { function sendWs(data, serviceOverride) {
if (remote.ws) { if (remote.ws && !conn.tunnelClosing) {
try { try {
remote.ws.send(packer.pack(browserAddr, data, serviceOverride), { binary: true }); remote.ws.send(packer.pack(browserAddr, data, serviceOverride), { binary: true });
// If we can't send data over the websocket as fast as this connection can send it to us // If we can't send data over the websocket as fast as this connection can send it to us
// (or there are a lot of connections trying to send over the same websocket) then we // (or there are a lot of connections trying to send over the same websocket) then we
// need to pause the connection for a little. We pause all connections if any are paused // need to pause the connection for a little. We pause all connections if any are paused
// to make things more fair so a connection doesn't get stuck waiting for everyone else // to make things more fair so a connection doesn't get stuck waiting for everyone else
// to finish because it got caught on the boundary. // to finish because it got caught on the boundary. Also if serviceOverride is set it
if (!serviceOverride) { // means the connection is over, so no need to pause it.
if (remote.pausedConns.length || remote.ws.bufferedAmount > 16*1024*1024) { if (!serviceOverride && (remote.pausedConns.length || remote.ws._socket.bufferSize > 1024*1024)) {
console.log('pausing', cid, 'to allow web socket to catch up');
conn.pause(); conn.pause();
remote.pausedConns.push(conn); remote.pausedConns.push(conn);
} }
}
} catch (err) { } catch (err) {
console.warn('[pipeWs] error sending websocket message', err); console.warn('[pipeWs] error sending websocket message', err);
} }
} }
} }
var trueEnd = conn.end;
conn.end = function () {
// delete the connection from the clients to make sure nothing more can be written, then
// call the actual end function to clost the write part of the connection.
delete remote.clients[cid];
trueEnd.apply(conn, arguments);
var timeoutId = setTimeout(function () {
console.warn('[pipeWs] browser connection', cid, 'still open 1 min after sending `end`');
conn.destroy();
}, 60*1000);
conn.on('close', function () {
clearTimeout(timeoutId);
});
};
remote.clients[cid] = conn; remote.clients[cid] = conn;
conn.on('data', function (chunk) { conn.on('data', function (chunk) {
console.log('[pipeWs] data from browser to tunneler', chunk.byteLength); console.log('[pipeWs] data from browser to tunneler', chunk.byteLength);
@ -425,30 +461,10 @@ module.exports.create = function (copts) {
conn.on('error', function (err) { conn.on('error', function (err) {
console.warn('[pipeWs] browser connection error', err); console.warn('[pipeWs] browser connection error', err);
}); });
conn.on('end', function () {
if (!sentEnd) {
sendWs(null, 'end');
sentEnd = true;
}
// Only add timeout to make sure other side is eventually closed if it isn't already closed.
if (remote.clients[cid]) {
var timeoutId = setTimeout(function () {
console.warn('[pipeWs] browser connection', cid, 'still open 1 min after receiving `end`');
conn.destroy();
}, 60*1000);
conn.on('close', function () {
clearTimeout(timeoutId);
});
}
});
conn.on('close', function (hadErr) { conn.on('close', function (hadErr) {
console.log('[pipeWs] browser connection closing'); console.log('[pipeWs] browser connection closing');
delete remote.clients[cid];
if (!sentEnd) {
sendWs(null, hadErr ? 'error': 'end'); sendWs(null, hadErr ? 'error': 'end');
sentEnd = true; delete remote.clients[cid];
}
}); });
} }