diff --git a/package.json b/package.json index 99ab010..7c85863 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,7 @@ "localhost.daplie.me-certificates": "^1.3.3", "redirect-https": "^1.1.0", "sni": "^1.0.0", - "tunnel-packer": "^1.3.0", + "tunnel-packer": "^1.4.0", "ws": "^2.2.3" } } diff --git a/wstunneld.js b/wstunneld.js index 786e2fa..d09a6a2 100644 --- a/wstunneld.js +++ b/wstunneld.js @@ -76,6 +76,9 @@ module.exports.create = function (copts) { return result || socketId; } + function sendTunnelMsg(addr, data, service) { + ws.send(packer.pack(addr, data, service), {binary: true}); + } function getBrowserConn(cid) { var browserConn; @@ -125,6 +128,16 @@ module.exports.create = function (copts) { token.ws = ws; token.clients = {}; + token.pausedConns = []; + ws._socket.on('drain', function () { + token.pausedConns.forEach(function (conn) { + if (!conn.manualPause) { + conn.resume(); + } + }); + token.pausedConns.length = 0; + }); + token.domains.forEach(function (domainname) { console.log('domainname', domainname); Devices.add(deviceLists, domainname, token); @@ -170,7 +183,7 @@ module.exports.create = function (copts) { if (firstToken) { var err = addToken(firstToken); if (err) { - ws.send(packer.pack(null, [0, err], 'control')); + sendTunnelMsg(null, [0, err], 'control'); ws.close(); return; } @@ -200,7 +213,7 @@ module.exports.create = function (copts) { if (!Array.isArray(cmd) || typeof cmd[0] !== 'number') { var msg = 'received bad command "' + opts.data.toString() + '"'; console.warn(msg, 'from websocket', socketId); - ws.send(packer.pack(null, [0, {message: msg, code: 'E_BAD_COMMAND'}], 'control')); + sendTunnelMsg(null, [0, {message: msg, code: 'E_BAD_COMMAND'}], 'control'); return; } @@ -229,19 +242,53 @@ module.exports.create = function (copts) { err = { message: 'unknown command "'+cmd[1]+'"', code: 'E_UNKNOWN_COMMAND' }; } - ws.send(packer.pack(null, [-cmd[0], err], 'control')); + sendTunnelMsg(null, [-cmd[0], err], 'control'); } + , onmessage: function (opts) { var cid = packer.addrToId(opts); console.log("remote '" + logName() + "' has data for '" + cid + "'", opts.data.byteLength); var browserConn = getBrowserConn(cid); - if (browserConn) { - browserConn.write(opts.data); - } else { - ws.send(packer.pack(opts, null, 'error')); + if (!browserConn) { + sendTunnelMsg(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error'); + return; + } + + browserConn.write(opts.data); + // If we have more than 1MB buffered data we need to tell the other side to slow down. + // Once we've finished sending what we have we can tell the other side to keep going. + if (browserConn.bufferSize > 1024*1024) { + sendTunnelMsg(opts, null, 'pause'); + browserConn.once('drain', function () { + sendTunnelMsg(opts, null, 'resume'); + }); } } + + , onpause: function (opts) { + var cid = packer.addrToId(opts); + console.log('[TunnelPause]', cid); + var browserConn = getBrowserConn(cid); + if (browserConn) { + browserConn.manualPause = true; + browserConn.pause(); + } else { + sendTunnelMsg(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error'); + } + } + , onresume: function (opts) { + var cid = packer.addrToId(opts); + console.log('[TunnelResume]', cid); + var browserConn = getBrowserConn(cid); + if (browserConn) { + browserConn.manualPause = false; + browserConn.resume(); + } else { + sendTunnelMsg(opts, {message: 'no matching connection', code: 'E_NO_CONN'}, 'error'); + } + } + , onend: function (opts) { var cid = packer.addrToId(opts); console.log('[TunnelEnd]', cid); @@ -321,7 +368,7 @@ module.exports.create = function (copts) { ws.on('error', hangup); // We only ever send one command and we send it once, so we just hard code the ID as 1 - ws.send(packer.pack(null, [1, 'hello', [unpacker._version], Object.keys(commandHandlers)], 'control')); + sendTunnelMsg(null, [1, 'hello', [unpacker._version], Object.keys(commandHandlers)], 'control'); } function pipeWs(servername, service, conn, remote) { @@ -337,6 +384,17 @@ module.exports.create = function (copts) { if (remote.ws) { try { remote.ws.send(packer.pack(browserAddr, data, serviceOverride), { binary: true }); + // If we can't send data over the websocket as fast as this connection can send it to us + // (or there are a lot of connections trying to send over the same websocket) then we + // need to pause the connection for a little. We pause all connections if any are paused + // to make things more fair so a connection doesn't get stuck waiting for everyone else + // to finish because it got caught on the boundary. + if (!serviceOverride) { + if (remote.pausedConns.length || remote.ws.bufferedAmount > 16*1024*1024) { + conn.pause(); + remote.pausedConns.push(conn); + } + } } catch (err) { console.warn('[pipeWs] error sending websocket message', err); }