Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 93 additions & 3 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,16 @@ function howMuchToRead(n, state) {
return (state[kState] & kEnded) !== 0 ? state.length : 0;
}

Readable.prototype.readv = function readv () {
return _read.call(this, true);
};

Readable.prototype.read = function read () {
return _read.call(this, false);
}

// You can override either this method, or the async _read(n) below.
Readable.prototype.read = function(n) {
function _read (n, returnArr) {
debug('read', n);
// Same as parseInt(undefined, 10), however V8 7.3 performance regressed
// in this scenario, so we are doing it manually.
Expand Down Expand Up @@ -748,7 +756,7 @@ Readable.prototype.read = function(n) {

let ret;
if (n > 0)
ret = fromList(n, state);
ret = returnArr ? arrFromList(n, state) : fromList(n, state);
else
ret = null;

Expand Down Expand Up @@ -777,7 +785,13 @@ Readable.prototype.read = function(n) {

if (ret !== null && (state[kState] & (kErrorEmitted | kCloseEmitted)) === 0) {
state[kState] |= kDataEmitted;
this.emit('data', ret);
if (returnArr) {
for (let i = 0; i < ret.length; ++i) {
this.emit('data', ret[i]);
}
} else {
this.emit('data', ret);
}
}

return ret;
Expand Down Expand Up @@ -1682,6 +1696,82 @@ function fromList(n, state) {
return ret;
}

function arrFromList(n, state) {
// nothing buffered.
if (state.length === 0)
return null;

let idx = state.bufferIndex;
let ret;

const buf = state.buffer;
const len = buf.length;

if ((state[kState] & kObjectMode) !== 0 || !n || n >= state.length) {
ret = buf.slice(idx);
idx += ret.length;
} else if (n < buf[idx].length) {
// `slice` is the same for buffers and strings.
ret = [buf[idx].slice(0, n)];
buf[idx] = buf[idx].slice(n);
} else if (n === buf[idx].length) {
// First chunk is a perfect match.
ret = [buf[idx]];
buf[idx++] = null;
} else if ((state[kState] & kDecoder) !== 0) {
ret = [];
while (idx < len) {
const str = buf[idx];
if (n > str.length) {
ret.push(str);
n -= str.length;
buf[idx++] = null;
} else {
if (n === buf.length) {
ret.push(str);
buf[idx++] = null;
} else {
ret.push(str.slice(0, n));
buf[idx] = str.slice(n);
}
break;
}
}
} else {
ret = [];
const retLen = n;
while (idx < len) {
const data = buf[idx];
if (n > data.length) {
ret.push(data);
n -= data.length;
buf[idx++] = null;
} else {
if (n === data.length) {
ret.push(data);
buf[idx++] = null;
} else {
ret.push(new FastBuffer(data.buffer, data.byteOffset, n));
buf[idx] = new FastBuffer(data.buffer, data.byteOffset + n, data.length - n);
}
break;
}
}
}

if (idx === len) {
state.buffer.length = 0;
state.bufferIndex = 0;
} else if (idx > 1024) {
state.buffer.splice(0, idx);
state.bufferIndex = 0;
} else {
state.bufferIndex = idx;
}

return ret;
}

function endReadable(stream) {
const state = stream._readableState;

Expand Down