stream: add FileHandle support to Read/WriteStream

Support creating a Read/WriteStream from a
FileHandle instead of a raw file descriptor
Add an EventEmitter to FileHandle with a single
'close' event.

Fixes: https://github.com/nodejs/node/issues/35240

PR-URL: https://github.com/nodejs/node/pull/35922
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Momtchil Momtchev 2020-11-02 13:10:36 +01:00 committed by Antoine du Hamel
parent 897307554a
commit 0fd121e00c
8 changed files with 285 additions and 41 deletions

View File

@ -1761,6 +1761,10 @@ fs.copyFileSync('source.txt', 'destination.txt', COPYFILE_EXCL);
<!-- YAML
added: v0.1.31
changes:
- version:
- REPLACEME
pr-url: https://github.com/nodejs/node/pull/35922
description: The `fd` option accepts FileHandle arguments.
- version:
- v13.6.0
- v12.17.0
@ -1792,7 +1796,7 @@ changes:
* `flags` {string} See [support of file system `flags`][]. **Default:**
`'r'`.
* `encoding` {string} **Default:** `null`
* `fd` {integer} **Default:** `null`
* `fd` {integer|FileHandle} **Default:** `null`
* `mode` {integer} **Default:** `0o666`
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `false`
@ -1868,6 +1872,10 @@ If `options` is a string, then it specifies the encoding.
<!-- YAML
added: v0.1.31
changes:
- version:
- REPLACEME
pr-url: https://github.com/nodejs/node/pull/35922
description: The `fd` option accepts FileHandle arguments.
- version:
- v13.6.0
- v12.17.0
@ -1897,7 +1905,7 @@ changes:
* `flags` {string} See [support of file system `flags`][]. **Default:**
`'w'`.
* `encoding` {string} **Default:** `'utf8'`
* `fd` {integer} **Default:** `null`
* `fd` {integer|FileHandle} **Default:** `null`
* `mode` {integer} **Default:** `0o666`
* `autoClose` {boolean} **Default:** `true`
* `emitClose` {boolean} **Default:** `false`
@ -4707,6 +4715,14 @@ the promise-based API uses the `FileHandle` class in order to help avoid
accidental leaking of unclosed file descriptors after a `Promise` is resolved or
rejected.
#### Event: `'close'`
<!-- YAML
added: REPLACEME
-->
The `'close'` event is emitted when the `FileHandle` and any of its underlying
resources (a file descriptor, for example) have been closed.
#### `filehandle.appendFile(data, options)`
<!-- YAML
added: v10.0.0

View File

@ -4,11 +4,13 @@ const {
ArrayFrom,
Boolean,
Error,
FunctionPrototypeCall,
NumberIsInteger,
ObjectAssign,
ObjectDefineProperties,
ObjectDefineProperty,
ObjectGetOwnPropertyDescriptor,
ObjectGetOwnPropertyDescriptors,
ReflectApply,
SafeMap,
String,
@ -646,8 +648,23 @@ function defineEventHandler(emitter, name) {
enumerable: true
});
}
const EventEmitterMixin = (Superclass) => {
class MixedEventEmitter extends Superclass {
constructor(...args) {
super(...args);
FunctionPrototypeCall(EventEmitter, this);
}
}
const protoProps = ObjectGetOwnPropertyDescriptors(EventEmitter.prototype);
delete protoProps.constructor;
ObjectDefineProperties(MixedEventEmitter.prototype, protoProps);
return MixedEventEmitter;
};
module.exports = {
Event,
EventEmitterMixin,
EventTarget,
NodeEventTarget,
defineEventHandler,

View File

@ -71,6 +71,7 @@ const {
} = require('internal/validators');
const pathModule = require('path');
const { promisify } = require('internal/util');
const { EventEmitterMixin } = require('internal/event_target');
const kHandle = Symbol('kHandle');
const kFd = Symbol('kFd');
@ -78,6 +79,8 @@ const kRefs = Symbol('kRefs');
const kClosePromise = Symbol('kClosePromise');
const kCloseResolve = Symbol('kCloseResolve');
const kCloseReject = Symbol('kCloseReject');
const kRef = Symbol('kRef');
const kUnref = Symbol('kUnref');
const { kUsePromises } = binding;
const {
@ -94,7 +97,7 @@ const lazyDOMException = hideStackFrames((message, name) => {
return new DOMException(message, name);
});
class FileHandle extends JSTransferable {
class FileHandle extends EventEmitterMixin(JSTransferable) {
constructor(filehandle) {
super();
this[kHandle] = filehandle;
@ -197,6 +200,7 @@ class FileHandle extends JSTransferable {
);
}
this.emit('close');
return this[kClosePromise];
}
@ -226,6 +230,22 @@ class FileHandle extends JSTransferable {
this[kHandle] = handle;
this[kFd] = handle.fd;
}
[kRef]() {
this[kRefs]++;
}
[kUnref]() {
this[kRefs]--;
if (this[kRefs] === 0) {
this[kFd] = -1;
PromisePrototypeThen(
this[kHandle].close(),
this[kCloseResolve],
this[kCloseReject]
);
}
}
}
async function fsCall(fn, handle, ...args) {
@ -242,18 +262,10 @@ async function fsCall(fn, handle, ...args) {
}
try {
handle[kRefs]++;
handle[kRef]();
return await fn(handle, ...args);
} finally {
handle[kRefs]--;
if (handle[kRefs] === 0) {
handle[kFd] = -1;
PromisePrototypeThen(
handle[kHandle].close(),
handle[kCloseResolve],
handle[kCloseReject]
);
}
handle[kUnref]();
}
}
@ -712,5 +724,7 @@ module.exports = {
readFile,
},
FileHandle
FileHandle,
kRef,
kUnref,
};

View File

@ -2,21 +2,25 @@
const {
Array,
FunctionPrototypeBind,
MathMin,
ObjectDefineProperty,
ObjectSetPrototypeOf,
PromisePrototypeThen,
ReflectApply,
Symbol,
} = primordials;
const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE
ERR_OUT_OF_RANGE,
ERR_METHOD_NOT_IMPLEMENTED,
} = require('internal/errors').codes;
const { deprecate } = require('internal/util');
const { validateInteger } = require('internal/validators');
const { errorOrDestroy } = require('internal/streams/destroy');
const fs = require('fs');
const { kRef, kUnref, FileHandle } = require('internal/fs/promises');
const { Buffer } = require('buffer');
const {
copyObject,
@ -28,6 +32,7 @@ const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');
const kFs = Symbol('kFs');
const kHandle = Symbol('kHandle');
function _construct(callback) {
const stream = this;
@ -66,6 +71,35 @@ function _construct(callback) {
}
}
// This generates an fs operations structure for a FileHandle
const FileHandleOperations = (handle) => {
return {
open: (path, flags, mode, cb) => {
throw new ERR_METHOD_NOT_IMPLEMENTED('open()');
},
close: (fd, cb) => {
handle[kUnref]();
PromisePrototypeThen(handle.close(),
() => cb(), cb);
},
read: (fd, buf, offset, length, pos, cb) => {
PromisePrototypeThen(handle.read(buf, offset, length, pos),
(r) => cb(null, r.bytesRead, r.buffer),
(err) => cb(err, 0, buf));
},
write: (fd, buf, offset, length, pos, cb) => {
PromisePrototypeThen(handle.write(buf, offset, length, pos),
(r) => cb(null, r.bytesWritten, r.buffer),
(err) => cb(err, 0, buf));
},
writev: (fd, buffers, pos, cb) => {
PromisePrototypeThen(handle.writev(buffers, pos),
(r) => cb(null, r.bytesWritten, r.buffers),
(err) => cb(err, 0, buffers));
}
};
};
function close(stream, err, cb) {
if (!stream.fd) {
// TODO(ronag)
@ -80,6 +114,32 @@ function close(stream, err, cb) {
}
}
function importFd(stream, options) {
stream.fd = null;
if (options.fd) {
if (typeof options.fd === 'number') {
// When fd is a raw descriptor, we must keep our fingers crossed
// that the descriptor won't get closed, or worse, replaced with
// another one
// https://github.com/nodejs/node/issues/35862
stream.fd = options.fd;
} else if (typeof options.fd === 'object' &&
options.fd instanceof FileHandle) {
// When fd is a FileHandle we can listen for 'close' events
if (options.fs)
// FileHandle is not supported with custom fs operations
throw new ERR_METHOD_NOT_IMPLEMENTED('FileHandle with fs');
stream[kHandle] = options.fd;
stream.fd = options.fd.fd;
stream[kFs] = FileHandleOperations(stream[kHandle]);
stream[kHandle][kRef]();
options.fd.on('close', FunctionPrototypeBind(stream.close, stream));
} else
throw ERR_INVALID_ARG_TYPE('options.fd',
['number', 'FileHandle'], options.fd);
}
}
function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
@ -115,10 +175,11 @@ function ReadStream(path, options) {
// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'r' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
importFd(this, options);
this.start = options.start;
this.end = options.end;
this.pos = undefined;
@ -287,10 +348,11 @@ function WriteStream(path, options) {
// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
this.fd = options.fd === undefined ? null : options.fd;
this.flags = options.flags === undefined ? 'w' : options.flags;
this.mode = options.mode === undefined ? 0o666 : options.mode;
importFd(this, options);
this.start = options.start;
this.pos = undefined;
this.bytesWritten = 0;

View File

@ -0,0 +1,50 @@
'use strict';
const common = require('../common');
const fs = require('fs');
const assert = require('assert');
const path = require('path');
const tmpdir = require('../common/tmpdir');
const file = path.join(tmpdir.path, 'read_stream_filehandle_worker.txt');
const input = 'hello world';
const { Worker, isMainThread, workerData } = require('worker_threads');
if (isMainThread || !workerData) {
tmpdir.refresh();
fs.writeFileSync(file, input);
fs.promises.open(file, 'r').then((handle) => {
handle.on('close', common.mustNotCall());
new Worker(__filename, {
workerData: { handle },
transferList: [handle]
});
});
fs.promises.open(file, 'r').then((handle) => {
fs.createReadStream(null, { fd: handle });
assert.throws(() => {
new Worker(__filename, {
workerData: { handle },
transferList: [handle]
});
}, {
code: 25,
});
});
} else {
let output = '';
const handle = workerData.handle;
handle.on('close', common.mustCall());
const stream = fs.createReadStream(null, { fd: handle });
stream.on('data', common.mustCallAtLeast((data) => {
output += data;
}));
stream.on('end', common.mustCall(() => {
handle.close();
assert.strictEqual(output, input);
}));
stream.on('close', common.mustCall());
}

View File

@ -19,33 +19,32 @@ async function read(fileHandle, buffer, offset, length, position) {
fileHandle.read(buffer, offset, length, position);
}
async function validateRead() {
const filePath = path.resolve(tmpDir, 'tmp-read-file.txt');
const fileHandle = await open(filePath, 'w+');
const buffer = Buffer.from('Hello world', 'utf8');
async function validateRead(data, file) {
const filePath = path.resolve(tmpDir, file);
const buffer = Buffer.from(data, 'utf8');
const fd = fs.openSync(filePath, 'w+');
fs.writeSync(fd, buffer, 0, buffer.length);
fs.closeSync(fd);
const readAsyncHandle = await read(fileHandle, Buffer.alloc(11), 0, 11, 0);
assert.deepStrictEqual(buffer.length, readAsyncHandle.bytesRead);
assert.deepStrictEqual(buffer, readAsyncHandle.buffer);
await fileHandle.close();
}
async function validateEmptyRead() {
const filePath = path.resolve(tmpDir, 'tmp-read-empty-file.txt');
const fileHandle = await open(filePath, 'w+');
const buffer = Buffer.from('', 'utf8');
const streamFileHandle = await open(filePath, 'w+');
const fd = fs.openSync(filePath, 'w+');
fs.writeSync(fd, buffer, 0, buffer.length);
fs.closeSync(fd);
const readAsyncHandle = await read(fileHandle, Buffer.alloc(11), 0, 11, 0);
assert.deepStrictEqual(buffer.length, readAsyncHandle.bytesRead);
fileHandle.on('close', common.mustCall());
const readAsyncHandle = await read(fileHandle, Buffer.alloc(11), 0, 11, 0);
assert.deepStrictEqual(data.length, readAsyncHandle.bytesRead);
if (data.length)
assert.deepStrictEqual(buffer, readAsyncHandle.buffer);
await fileHandle.close();
const stream = fs.createReadStream(null, { fd: streamFileHandle });
let streamData = Buffer.alloc(0);
for await (const chunk of stream)
streamData = Buffer.from(chunk);
assert.deepStrictEqual(buffer, streamData);
if (data.length)
assert.deepStrictEqual(streamData, readAsyncHandle.buffer);
await streamFileHandle.close();
}
async function validateLargeRead() {
@ -67,9 +66,9 @@ let useConf = false;
tmpdir.refresh();
useConf = value;
await validateRead()
.then(validateEmptyRead)
.then(validateLargeRead)
.then(common.mustCall());
await validateRead('Hello world', 'tmp-read-file.txt')
.then(validateRead('', 'tmp-read-empty-file.txt'))
.then(validateLargeRead)
.then(common.mustCall());
}
});
})().then(common.mustCall());

View File

@ -0,0 +1,65 @@
'use strict';
const common = require('../common');
const fs = require('fs');
const assert = require('assert');
const path = require('path');
const tmpdir = require('../common/tmpdir');
const file = path.join(tmpdir.path, 'read_stream_filehandle_test.txt');
const input = 'hello world';
let output = '';
tmpdir.refresh();
fs.writeFileSync(file, input);
fs.promises.open(file, 'r').then(common.mustCall((handle) => {
handle.on('close', common.mustCall());
const stream = fs.createReadStream(null, { fd: handle });
stream.on('data', common.mustCallAtLeast((data) => {
output += data;
}));
stream.on('end', common.mustCall(() => {
assert.strictEqual(output, input);
}));
stream.on('close', common.mustCall());
}));
fs.promises.open(file, 'r').then(common.mustCall((handle) => {
handle.on('close', common.mustCall());
const stream = fs.createReadStream(null, { fd: handle });
stream.on('data', common.mustNotCall());
stream.on('close', common.mustCall());
handle.close();
}));
fs.promises.open(file, 'r').then(common.mustCall((handle) => {
handle.on('close', common.mustCall());
const stream = fs.createReadStream(null, { fd: handle });
stream.on('close', common.mustCall());
stream.on('data', common.mustCall(() => {
handle.close();
}));
}));
fs.promises.open(file, 'r').then(common.mustCall((handle) => {
handle.on('close', common.mustCall());
const stream = fs.createReadStream(null, { fd: handle });
stream.on('close', common.mustCall());
stream.close();
}));
fs.promises.open(file, 'r').then(common.mustCall((handle) => {
assert.throws(() => {
fs.createReadStream(null, { fd: handle, fs });
}, {
code: 'ERR_METHOD_NOT_IMPLEMENTED',
name: 'Error',
message: 'The FileHandle with fs method is not implemented'
});
handle.close();
}));

View File

@ -0,0 +1,21 @@
'use strict';
const common = require('../common');
const fs = require('fs');
const path = require('path');
const assert = require('assert');
const tmpdir = require('../common/tmpdir');
const file = path.join(tmpdir.path, 'write_stream_filehandle_test.txt');
const input = 'hello world';
tmpdir.refresh();
fs.promises.open(file, 'w+').then(common.mustCall((handle) => {
handle.on('close', common.mustCall());
const stream = fs.createWriteStream(null, { fd: handle });
stream.end(input);
stream.on('close', common.mustCall(() => {
const output = fs.readFileSync(file, 'utf-8');
assert.strictEqual(output, input);
}));
}));