gl-store-s3.js/node_modules/aws-sdk/lib/event-stream/event-message-chunker-strea...

121 lines
4.0 KiB
JavaScript

var Transform = require('stream').Transform;
var allocBuffer = require('./alloc-buffer').allocBuffer;
/** @type {Transform} */
function EventMessageChunkerStream(options) {
Transform.call(this, options);
this.currentMessageTotalLength = 0;
this.currentMessagePendingLength = 0;
/** @type {Buffer} */
this.currentMessage = null;
/** @type {Buffer} */
this.messageLengthBuffer = null;
}
EventMessageChunkerStream.prototype = Object.create(Transform.prototype);
/**
*
* @param {Buffer} chunk
* @param {string} encoding
* @param {*} callback
*/
EventMessageChunkerStream.prototype._transform = function(chunk, encoding, callback) {
var chunkLength = chunk.length;
var currentOffset = 0;
while (currentOffset < chunkLength) {
// create new message if necessary
if (!this.currentMessage) {
// working on a new message, determine total length
var bytesRemaining = chunkLength - currentOffset;
// prevent edge case where total length spans 2 chunks
if (!this.messageLengthBuffer) {
this.messageLengthBuffer = allocBuffer(4);
}
var numBytesForTotal = Math.min(
4 - this.currentMessagePendingLength, // remaining bytes to fill the messageLengthBuffer
bytesRemaining // bytes left in chunk
);
chunk.copy(
this.messageLengthBuffer,
this.currentMessagePendingLength,
currentOffset,
currentOffset + numBytesForTotal
);
this.currentMessagePendingLength += numBytesForTotal;
currentOffset += numBytesForTotal;
if (this.currentMessagePendingLength < 4) {
// not enough information to create the current message
break;
}
this.allocateMessage(this.messageLengthBuffer.readUInt32BE(0));
this.messageLengthBuffer = null;
}
// write data into current message
var numBytesToWrite = Math.min(
this.currentMessageTotalLength - this.currentMessagePendingLength, // number of bytes left to complete message
chunkLength - currentOffset // number of bytes left in the original chunk
);
chunk.copy(
this.currentMessage, // target buffer
this.currentMessagePendingLength, // target offset
currentOffset, // chunk offset
currentOffset + numBytesToWrite // chunk end to write
);
this.currentMessagePendingLength += numBytesToWrite;
currentOffset += numBytesToWrite;
// check if a message is ready to be pushed
if (this.currentMessageTotalLength && this.currentMessageTotalLength === this.currentMessagePendingLength) {
// push out the message
this.push(this.currentMessage);
// cleanup
this.currentMessage = null;
this.currentMessageTotalLength = 0;
this.currentMessagePendingLength = 0;
}
}
callback();
};
EventMessageChunkerStream.prototype._flush = function(callback) {
if (this.currentMessageTotalLength) {
if (this.currentMessageTotalLength === this.currentMessagePendingLength) {
callback(null, this.currentMessage);
} else {
callback(new Error('Truncated event message received.'));
}
} else {
callback();
}
};
/**
* @param {number} size Size of the message to be allocated.
* @api private
*/
EventMessageChunkerStream.prototype.allocateMessage = function(size) {
if (typeof size !== 'number') {
throw new Error('Attempted to allocate an event message where size was not a number: ' + size);
}
this.currentMessageTotalLength = size;
this.currentMessagePendingLength = 4;
this.currentMessage = allocBuffer(size);
this.currentMessage.writeUInt32BE(size, 0);
};
/**
* @api private
*/
module.exports = {
EventMessageChunkerStream: EventMessageChunkerStream
};