implemented throttling when we buffer too much data

This commit is contained in:
tigerbot 2017-09-07 17:10:47 -06:00
parent bbdb09902b
commit 96ab344b71
2 changed files with 67 additions and 9 deletions

View File

@ -53,7 +53,7 @@
"localhost.daplie.me-certificates": "^1.3.3", "localhost.daplie.me-certificates": "^1.3.3",
"redirect-https": "^1.1.0", "redirect-https": "^1.1.0",
"sni": "^1.0.0", "sni": "^1.0.0",
"tunnel-packer": "^1.3.0", "tunnel-packer": "^1.4.0",
"ws": "^2.2.3" "ws": "^2.2.3"
} }
} }

View File

@ -76,6 +76,9 @@ module.exports.create = function (copts) {
return result || socketId; return result || socketId;
} }
function sendTunnelMsg(addr, data, service) {
ws.send(packer.pack(addr, data, service), {binary: true});
}
function getBrowserConn(cid) { function getBrowserConn(cid) {
var browserConn; var browserConn;
@ -125,6 +128,16 @@ module.exports.create = function (copts) {
token.ws = ws; token.ws = ws;
token.clients = {}; token.clients = {};
token.pausedConns = [];
ws._socket.on('drain', function () {
token.pausedConns.forEach(function (conn) {
if (!conn.manualPause) {
conn.resume();
}
});
token.pausedConns.length = 0;
});
token.domains.forEach(function (domainname) { token.domains.forEach(function (domainname) {
console.log('domainname', domainname); console.log('domainname', domainname);
Devices.add(deviceLists, domainname, token); Devices.add(deviceLists, domainname, token);
@ -170,7 +183,7 @@ module.exports.create = function (copts) {
if (firstToken) { if (firstToken) {
var err = addToken(firstToken); var err = addToken(firstToken);
if (err) { if (err) {
ws.send(packer.pack(null, [0, err], 'control')); sendTunnelMsg(null, [0, err], 'control');
ws.close(); ws.close();
return; return;
} }
@ -200,7 +213,7 @@ module.exports.create = function (copts) {
if (!Array.isArray(cmd) || typeof cmd[0] !== 'number') { if (!Array.isArray(cmd) || typeof cmd[0] !== 'number') {
var msg = 'received bad command "' + opts.data.toString() + '"'; var msg = 'received bad command "' + opts.data.toString() + '"';
console.warn(msg, 'from websocket', socketId); console.warn(msg, 'from websocket', socketId);
ws.send(packer.pack(null, [0, {message: msg, code: 'E_BAD_COMMAND'}], 'control')); sendTunnelMsg(null, [0, {message: msg, code: 'E_BAD_COMMAND'}], 'control');
return; return;
} }
@ -229,19 +242,53 @@ module.exports.create = function (copts) {
err = { message: 'unknown command "'+cmd[1]+'"', code: 'E_UNKNOWN_COMMAND' }; err = { message: 'unknown command "'+cmd[1]+'"', code: 'E_UNKNOWN_COMMAND' };
} }
ws.send(packer.pack(null, [-cmd[0], err], 'control')); sendTunnelMsg(null, [-cmd[0], err], 'control');
} }
, onmessage: function (opts) { , onmessage: function (opts) {
var cid = packer.addrToId(opts); var cid = packer.addrToId(opts);
console.log("remote '" + logName() + "' has data for '" + cid + "'", opts.data.byteLength); console.log("remote '" + logName() + "' has data for '" + cid + "'", opts.data.byteLength);
var browserConn = getBrowserConn(cid); var browserConn = getBrowserConn(cid);
if (browserConn) { if (!browserConn) {
sendTunnelMsg(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error');
return;
}
browserConn.write(opts.data); browserConn.write(opts.data);
// If we have more than 1MB buffered data we need to tell the other side to slow down.
// Once we've finished sending what we have we can tell the other side to keep going.
if (browserConn.bufferSize > 1024*1024) {
sendTunnelMsg(opts, null, 'pause');
browserConn.once('drain', function () {
sendTunnelMsg(opts, null, 'resume');
});
}
}
, onpause: function (opts) {
var cid = packer.addrToId(opts);
console.log('[TunnelPause]', cid);
var browserConn = getBrowserConn(cid);
if (browserConn) {
browserConn.manualPause = true;
browserConn.pause();
} else { } else {
ws.send(packer.pack(opts, null, 'error')); sendTunnelMsg(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error');
} }
} }
, onresume: function (opts) {
var cid = packer.addrToId(opts);
console.log('[TunnelResume]', cid);
var browserConn = getBrowserConn(cid);
if (browserConn) {
browserConn.manualPause = false;
browserConn.resume();
} else {
sendTunnelMsg(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error');
}
}
, onend: function (opts) { , onend: function (opts) {
var cid = packer.addrToId(opts); var cid = packer.addrToId(opts);
console.log('[TunnelEnd]', cid); console.log('[TunnelEnd]', cid);
@ -321,7 +368,7 @@ module.exports.create = function (copts) {
ws.on('error', hangup); ws.on('error', hangup);
// We only ever send one command and we send it once, so we just hard code the ID as 1 // We only ever send one command and we send it once, so we just hard code the ID as 1
ws.send(packer.pack(null, [1, 'hello', [unpacker._version], Object.keys(commandHandlers)], 'control')); sendTunnelMsg(null, [1, 'hello', [unpacker._version], Object.keys(commandHandlers)], 'control');
} }
function pipeWs(servername, service, conn, remote) { function pipeWs(servername, service, conn, remote) {
@ -337,6 +384,17 @@ module.exports.create = function (copts) {
if (remote.ws) { if (remote.ws) {
try { try {
remote.ws.send(packer.pack(browserAddr, data, serviceOverride), { binary: true }); remote.ws.send(packer.pack(browserAddr, data, serviceOverride), { binary: true });
// If we can't send data over the websocket as fast as this connection can send it to us
// (or there are a lot of connections trying to send over the same websocket) then we
// need to pause the connection for a little. We pause all connections if any are paused
// to make things more fair so a connection doesn't get stuck waiting for everyone else
// to finish because it got caught on the boundary.
if (!serviceOverride) {
if (remote.pausedConns.length || remote.ws.bufferedAmount > 16*1024*1024) {
conn.pause();
remote.pausedConns.push(conn);
}
}
} catch (err) { } catch (err) {
console.warn('[pipeWs] error sending websocket message', err); console.warn('[pipeWs] error sending websocket message', err);
} }