diff --git a/index.js b/index.js index 793ed40..42ed4ec 100644 --- a/index.js +++ b/index.js @@ -1,8 +1,19 @@ 'use strict'; module.exports.create = function (opts) { + var machine; + + if (!opts.onMessage && !opts.onmessage) { + machine = new (require('events').EventEmitter)(); + } + + machine.onMessage = opts.onmessage || opts.onMessage; + machine.onmessage = opts.onmessage || opts.onMessage; + machine.onError = opts.onerror || opts.onError; + machine.onerror = opts.onerror || opts.onError; + machine.onEnd = opts.onend || opts.onEnd; + machine.onend = opts.onend || opts.onEnd; - var machine = { onMessage: opts.onMessage }; machine._version = 1; machine.state = 0; machine.states = { 0: 'version', 1: 'headerLength', 2: 'header', 3: 'data'/*, 4: 'error'*/ }; @@ -95,6 +106,7 @@ module.exports.create = function (opts) { //console.log('curSize:', curSize); //console.log('bodyLen:', machine.bodyLen, typeof machine.bodyLen); var partLen = 0; + var msg; partLen = Math.min(machine.bodyLen - machine.bufIndex, chunk.length - machine.chunkIndex); @@ -125,13 +137,69 @@ module.exports.create = function (opts) { } machine.bufIndex += partLen; - machine.onMessage({ - family: machine.family - , address: machine.address - , port: machine.port - , data: machine.buf.slice(0, machine.bufIndex) - , service: machine.service - }); + machine.service = machine.service || machine.type; + machine.type = machine.type || machine.service; + + + // + // data, end, error + // + if ('end' === machine.type) { + msg = {}; + + msg.family = machine.family; + msg.address = machine.address; + msg.port = machine.port; + msg.service = 'end'; + msg.type = msg.type || 'end'; + + if (machine.emit) { + machine.emit('tunnelEnd', msg); + } + else { + (machine.onend||machine.onmessage)(msg); + } + } + else if ('error' === machine.type) { + try { + msg = JSON.parse(machine.data.toString()); + } catch(e) { + msg = new Error('unknown error'); + } + + msg.family = machine.family; + msg.address = machine.address; + msg.port = machine.port; + msg.service = 'error'; + msg.type = msg.type || 'error'; + + if (machine.emit) { + machine.emit('tunnelError', msg); + } + else { + (machine.onerror||machine.onError)(msg); + } + } + else { + msg = { + data: machine.buf.slice(0, machine.bufIndex) + , service: machine.service + , type: machine.type + }; + + msg.family = machine.family; + msg.address = machine.address; + msg.port = machine.port; + msg.service = machine.service; + msg.type = machine.type || 'data'; + + if (machine.emit) { + machine.emit('tunnelData', msg); + } + else { + machine.onmessage(msg); + } + } machine.chunkIndex += partLen; // === chunk.length machine.buf = null; // reset to null