improved throttle of connections based on websocket speed

This commit is contained in:
tigerbot 2017-09-11 15:35:03 -06:00
parent d5992f001f
commit 4a4765d6fb
1 changed files with 22 additions and 9 deletions

View File

@ -52,7 +52,7 @@ function run(copts) {
// stuff waiting for all other connections to finish because it tried writing near the border. // stuff waiting for all other connections to finish because it tried writing near the border.
var bufSize = wsHandlers.sendMessage(Packer.pack(opts, chunk)); var bufSize = wsHandlers.sendMessage(Packer.pack(opts, chunk));
if (pausedClients.length || bufSize > 1024*1024) { if (pausedClients.length || bufSize > 1024*1024) {
console.log('paused connection', cid, 'to allow websocket to catch up'); // console.log('[onLocalData] paused connection', cid, 'to allow websocket to catch up');
conn.pause(); conn.pause();
pausedClients.push(conn); pausedClients.push(conn);
} }
@ -97,15 +97,19 @@ function run(copts) {
} }
conn.write(opts.data); conn.write(opts.data);
if (conn.bufferSize > 1024*1024) {
wsHandlers.sendMessage(Packer.pack(opts, null, 'pause'));
conn.once('drain', function () {
wsHandlers.sendMessage(Packer.pack(opts, null, 'resume'));
});
}
// It might seem weird to increase the "read" value in a function named `write`, but this // It might seem weird to increase the "read" value in a function named `write`, but this
// is bytes read from the tunnel and written to the local connection. // is bytes read from the tunnel and written to the local connection.
conn.tunnelRead += opts.data.byteLength; conn.tunnelRead += opts.data.byteLength;
if (!conn.remotePaused && conn.bufferSize > 1024*1024) {
wsHandlers.sendMessage(Packer.pack(opts, conn.tunnelRead, 'pause'));
conn.remotePaused = true;
conn.once('drain', function () {
wsHandlers.sendMessage(Packer.pack(opts, conn.tunnelRead, 'resume'));
conn.remotePaused = false;
});
}
return true; return true;
} }
@ -416,9 +420,18 @@ function run(copts) {
timeoutId = setTimeout(wsHandlers.checkTimeout, activityTimeout); timeoutId = setTimeout(wsHandlers.checkTimeout, activityTimeout);
wstunneler._socket.on('drain', function () { wstunneler._socket.on('drain', function () {
// the websocket library has it's own buffer apart from node's socket buffer, but that one
// is much more difficult to watch, so we watch for the lower level buffer to drain and
// then check to see if the upper level buffer is still too full to write to. Note that
// the websocket library buffer has something to do with compression, so I'm not requiring
// that to be 0 before we start up again.
if (wstunneler.bufferedAmount > 128*1024) {
return;
}
pausedClients.forEach(function (conn) { pausedClients.forEach(function (conn) {
if (!conn.manualPause) { if (!conn.manualPause) {
console.log('resuming connection', conn.tunnelCid, 'now the websocket has caught up'); // console.log('resuming connection', conn.tunnelCid, 'now the websocket has caught up');
conn.resume(); conn.resume();
} }
}); });
@ -458,7 +471,7 @@ function run(copts) {
if (wstunneler) { if (wstunneler) {
try { try {
wstunneler.send(msg, {binary: true}); wstunneler.send(msg, {binary: true});
return wstunneler._socket.bufferSize; return wstunneler.bufferedAmount;
} catch (err) { } catch (err) {
// There is a chance that this occurred after the websocket was told to close // There is a chance that this occurred after the websocket was told to close
// and before it finished, in which case we don't need to log the error. // and before it finished, in which case we don't need to log the error.