var Transform = require('stream').Transform; var allocBuffer = require('./alloc-buffer').allocBuffer; /** @type {Transform} */ function EventMessageChunkerStream(options) { Transform.call(this, options); this.currentMessageTotalLength = 0; this.currentMessagePendingLength = 0; /** @type {Buffer} */ this.currentMessage = null; /** @type {Buffer} */ this.messageLengthBuffer = null; } EventMessageChunkerStream.prototype = Object.create(Transform.prototype); /** * * @param {Buffer} chunk * @param {string} encoding * @param {*} callback */ EventMessageChunkerStream.prototype._transform = function(chunk, encoding, callback) { var chunkLength = chunk.length; var currentOffset = 0; while (currentOffset < chunkLength) { // create new message if necessary if (!this.currentMessage) { // working on a new message, determine total length var bytesRemaining = chunkLength - currentOffset; // prevent edge case where total length spans 2 chunks if (!this.messageLengthBuffer) { this.messageLengthBuffer = allocBuffer(4); } var numBytesForTotal = Math.min( 4 - this.currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer bytesRemaining // bytes left in chunk ); chunk.copy( this.messageLengthBuffer, this.currentMessagePendingLength, currentOffset, currentOffset + numBytesForTotal ); this.currentMessagePendingLength += numBytesForTotal; currentOffset += numBytesForTotal; if (this.currentMessagePendingLength < 4) { // not enough information to create the current message break; } this.allocateMessage(this.messageLengthBuffer.readUInt32BE(0)); this.messageLengthBuffer = null; } // write data into current message var numBytesToWrite = Math.min( this.currentMessageTotalLength - this.currentMessagePendingLength, // number of bytes left to complete message chunkLength - currentOffset // number of bytes left in the original chunk ); chunk.copy( this.currentMessage, // target buffer this.currentMessagePendingLength, // target offset currentOffset, // chunk offset currentOffset + numBytesToWrite // chunk end to write ); this.currentMessagePendingLength += numBytesToWrite; currentOffset += numBytesToWrite; // check if a message is ready to be pushed if (this.currentMessageTotalLength && this.currentMessageTotalLength === this.currentMessagePendingLength) { // push out the message this.push(this.currentMessage); // cleanup this.currentMessage = null; this.currentMessageTotalLength = 0; this.currentMessagePendingLength = 0; } } callback(); }; EventMessageChunkerStream.prototype._flush = function(callback) { if (this.currentMessageTotalLength) { if (this.currentMessageTotalLength === this.currentMessagePendingLength) { callback(null, this.currentMessage); } else { callback(new Error('Truncated event message received.')); } } else { callback(); } }; /** * @param {number} size Size of the message to be allocated. * @api private */ EventMessageChunkerStream.prototype.allocateMessage = function(size) { if (typeof size !== 'number') { throw new Error('Attempted to allocate an event message where size was not a number: ' + size); } this.currentMessageTotalLength = size; this.currentMessagePendingLength = 4; this.currentMessage = allocBuffer(size); this.currentMessage.writeUInt32BE(size, 0); }; /** * @api private */ module.exports = { EventMessageChunkerStream: EventMessageChunkerStream };