fs: make sure to write entire buffer

fs.write(v) is not guaranteed to write everything in a single
call. Make sure we don't assume so.

PR-URL: https://github.com/nodejs/node/pull/49211
Co-authored-by: Chemi Atlow <chemi@atlow.co.il>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
This commit is contained in:
Robert Nagy 2022-03-22 12:48:56 +01:00 committed by atlowChemi
parent 996f3904bc
commit feb5b0fef8
No known key found for this signature in database
GPG Key ID: E2DE4855C6AF0350
2 changed files with 104 additions and 15 deletions

View File

@ -13,8 +13,10 @@ const {
const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED,
ERR_SYSTEM_ERROR,
} = require('internal/errors').codes;
const {
deprecate,
@ -392,9 +394,67 @@ WriteStream.prototype.open = openWriteFs;
WriteStream.prototype._construct = _construct;
function writeAll(data, size, pos, cb, retries = 0) {
this[kFs].write(this.fd, data, 0, size, pos, (er, bytesWritten, buffer) => {
// No data currently available and operation should be retried later.
if (er?.code === 'EAGAIN') {
er = null;
bytesWritten = 0;
}
if (this.destroyed || er) {
return cb(er || new ERR_STREAM_DESTROYED('write'));
}
this.bytesWritten += bytesWritten;
retries = bytesWritten ? 0 : retries + 1;
size -= bytesWritten;
pos += bytesWritten;
// Try writing non-zero number of bytes up to 5 times.
if (retries > 5) {
cb(new ERR_SYSTEM_ERROR('write failed'));
} else if (size) {
writeAll.call(this, buffer.slice(bytesWritten), size, pos, cb, retries);
} else {
cb();
}
});
}
function writevAll(chunks, size, pos, cb, retries = 0) {
this[kFs].writev(this.fd, chunks, this.pos, (er, bytesWritten, buffers) => {
// No data currently available and operation should be retried later.
if (er?.code === 'EAGAIN') {
er = null;
bytesWritten = 0;
}
if (this.destroyed || er) {
return cb(er || new ERR_STREAM_DESTROYED('writev'));
}
this.bytesWritten += bytesWritten;
retries = bytesWritten ? 0 : retries + 1;
size -= bytesWritten;
pos += bytesWritten;
// Try writing non-zero number of bytes up to 5 times.
if (retries > 5) {
cb(new ERR_SYSTEM_ERROR('writev failed'));
} else if (size) {
writevAll.call(this, [Buffer.concat(buffers).slice(bytesWritten)], size, pos, cb, retries);
} else {
cb();
}
});
}
WriteStream.prototype._write = function(data, encoding, cb) {
this[kIsPerformingIO] = true;
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
writeAll.call(this, data, data.length, this.pos, (er) => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
@ -402,12 +462,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
return this.emit(kIoDone, er);
}
if (er) {
return cb(er);
}
this.bytesWritten += bytes;
cb();
cb(er);
});
if (this.pos !== undefined)
@ -427,7 +482,7 @@ WriteStream.prototype._writev = function(data, cb) {
}
this[kIsPerformingIO] = true;
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
writevAll.call(this, chunks, size, this.pos, (er) => {
this[kIsPerformingIO] = false;
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
@ -435,12 +490,7 @@ WriteStream.prototype._writev = function(data, cb) {
return this.emit(kIoDone, er);
}
if (er) {
return cb(er);
}
this.bytesWritten += bytes;
cb();
cb(er);
});
if (this.pos !== undefined)

View File

@ -0,0 +1,39 @@
import * as common from '../common/index.mjs';
import tmpdir from '../common/tmpdir.js';
import assert from 'node:assert';
import fs from 'node:fs';
import { describe, it, mock } from 'node:test';
import { finished } from 'node:stream/promises';
tmpdir.refresh();
const file = tmpdir.resolve('writeStreamEAGAIN.txt');
const errorWithEAGAIN = (fd, buffer, offset, length, position, callback) => {
callback(Object.assign(new Error(), { code: 'EAGAIN' }), 0, buffer);
};
describe('WriteStream EAGAIN', { concurrency: true }, () => {
it('_write', async () => {
const mockWrite = mock.fn(fs.write);
mockWrite.mock.mockImplementationOnce(errorWithEAGAIN);
const stream = fs.createWriteStream(file, {
fs: {
open: common.mustCall(fs.open),
write: mockWrite,
close: common.mustCall(fs.close),
}
});
stream.end('foo');
stream.on('close', common.mustCall());
stream.on('error', common.mustNotCall());
await finished(stream);
assert.strictEqual(mockWrite.mock.callCount(), 2);
assert.strictEqual(fs.readFileSync(file, 'utf8'), 'foo');
});
it('_write', async () => {
const stream = fs.createWriteStream(file);
mock.getter(stream, 'destroyed', () => true);
stream.end('foo');
await finished(stream).catch(common.mustCall());
});
});