diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index 813efe731fc..54c2ba076f1 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -9,7 +9,13 @@ The `node:worker_threads` module enables the use of threads that execute JavaScript in parallel. To access it: -```js +```mjs +import worker from 'node:worker_threads'; +``` + +```cjs +'use strict'; + const worker = require('node:worker_threads'); ``` @@ -21,9 +27,43 @@ Unlike `child_process` or `cluster`, `worker_threads` can share memory. They do so by transferring `ArrayBuffer` instances or sharing `SharedArrayBuffer` instances. -```js +```mjs +import { + Worker, + isMainThread, + parentPort, + workerData, +} from 'node:worker_threads'; + +if (!isMainThread) { + const { parse } = await import('some-js-parsing-library'); + const script = workerData; + parentPort.postMessage(parse(script)); +} + +export default function parseJSAsync(script) { + return new Promise((resolve, reject) => { + const worker = new Worker(new URL(import.meta.url), { + workerData: script, + }); + worker.on('message', resolve); + worker.on('error', reject); + worker.on('exit', (code) => { + if (code !== 0) + reject(new Error(`Worker stopped with exit code ${code}`)); + }); + }); +}; +``` + +```cjs +'use strict'; + const { - Worker, isMainThread, parentPort, workerData, + Worker, + isMainThread, + parentPort, + workerData, } = require('node:worker_threads'); if (isMainThread) { @@ -84,7 +124,25 @@ of data passed to the spawning thread's `worker.setEnvironmentData()`. Every new `Worker` receives its own copy of the environment data automatically. -```js +```mjs +import { + Worker, + isMainThread, + setEnvironmentData, + getEnvironmentData, +} from 'node:worker_threads'; + +if (isMainThread) { + setEnvironmentData('Hello', 'World!'); + const worker = new Worker(new URL(import.meta.url)); +} else { + console.log(getEnvironmentData('Hello')); // Prints 'World!'. +} +``` + +```cjs +'use strict'; + const { Worker, isMainThread, @@ -116,12 +174,6 @@ Is `true` if this code is running inside of an internal [`Worker`][] thread (e.g node --experimental-loader ./loader.js main.js ``` -```cjs -// loader.js -const { isInternalThread } = require('node:worker_threads'); -console.log(isInternalThread); // true -``` - ```mjs // loader.js import { isInternalThread } from 'node:worker_threads'; @@ -129,14 +181,24 @@ console.log(isInternalThread); // true ``` ```cjs -// main.js +// loader.js +'use strict'; + const { isInternalThread } = require('node:worker_threads'); +console.log(isInternalThread); // true +``` + +```mjs +// main.js +import { isInternalThread } from 'node:worker_threads'; console.log(isInternalThread); // false ``` -```mjs +```cjs // main.js -import { isInternalThread } from 'node:worker_threads'; +'use strict'; + +const { isInternalThread } = require('node:worker_threads'); console.log(isInternalThread); // false ``` @@ -150,7 +212,21 @@ added: v10.5.0 Is `true` if this code is not running inside of a [`Worker`][] thread. -```js +```mjs +import { Worker, isMainThread } from 'node:worker_threads'; + +if (isMainThread) { + // This re-loads the current file inside a Worker instance. + new Worker(new URL(import.meta.url)); +} else { + console.log('Inside Worker!'); + console.log(isMainThread); // Prints 'false'. +} +``` + +```cjs +'use strict'; + const { Worker, isMainThread } = require('node:worker_threads'); if (isMainThread) { @@ -183,7 +259,35 @@ For example, Node.js marks the `ArrayBuffer`s it uses for its This operation cannot be undone. -```js +```mjs +import { MessageChannel, markAsUntransferable } from 'node:worker_threads'; + +const pooledBuffer = new ArrayBuffer(8); +const typedArray1 = new Uint8Array(pooledBuffer); +const typedArray2 = new Float64Array(pooledBuffer); + +markAsUntransferable(pooledBuffer); + +const { port1 } = new MessageChannel(); +try { + // This will throw an error, because pooledBuffer is not transferable. + port1.postMessage(typedArray1, [ typedArray1.buffer ]); +} catch (error) { + // error.name === 'DataCloneError' +} + +// The following line prints the contents of typedArray1 -- it still owns +// its memory and has not been transferred. Without +// `markAsUntransferable()`, this would print an empty Uint8Array and the +// postMessage call would have succeeded. +// typedArray2 is intact as well. +console.log(typedArray1); +console.log(typedArray2); +``` + +```cjs +'use strict'; + const { MessageChannel, markAsUntransferable } = require('node:worker_threads'); const pooledBuffer = new ArrayBuffer(8); @@ -223,7 +327,18 @@ added: v21.0.0 Check if an object is marked as not transferable with [`markAsUntransferable()`][]. -```js +```mjs +import { markAsUntransferable, isMarkedAsUntransferable } from 'node:worker_threads'; + +const pooledBuffer = new ArrayBuffer(8); +markAsUntransferable(pooledBuffer); + +isMarkedAsUntransferable(pooledBuffer); // Returns true. +``` + +```cjs +'use strict'; + const { markAsUntransferable, isMarkedAsUntransferable } = require('node:worker_threads'); const pooledBuffer = new ArrayBuffer(8); @@ -252,7 +367,23 @@ This has no effect on `ArrayBuffer`, or any `Buffer` like objects. This operation cannot be undone. -```js +```mjs +import { markAsUncloneable } from 'node:worker_threads'; + +const anyObject = { foo: 'bar' }; +markAsUncloneable(anyObject); +const { port1 } = new MessageChannel(); +try { + // This will throw an error, because anyObject is not cloneable. + port1.postMessage(anyObject); +} catch (error) { + // error.name === 'DataCloneError' +} +``` + +```cjs +'use strict'; + const { markAsUncloneable } = require('node:worker_threads'); const anyObject = { foo: 'bar' }; @@ -309,7 +440,26 @@ using `worker.on('message')`, and messages sent from the parent thread using `worker.postMessage()` are available in this thread using `parentPort.on('message')`. -```js +```mjs +import { Worker, isMainThread, parentPort } from 'node:worker_threads'; + +if (isMainThread) { + const worker = new Worker(new URL(import.meta.url)); + worker.once('message', (message) => { + console.log(message); // Prints 'Hello, world!'. + }); + worker.postMessage('Hello, world!'); +} else { + // When a message from the parent thread is received, send it back: + parentPort.once('message', (message) => { + parentPort.postMessage(message); + }); +} +``` + +```cjs +'use strict'; + const { Worker, isMainThread, parentPort } = require('node:worker_threads'); if (isMainThread) { @@ -365,7 +515,6 @@ The example below shows the use of of `postMessageToThread`: it creates 10 neste the last one will try to communicate with the main thread. ```mjs -import { fileURLToPath } from 'node:url'; import process from 'node:process'; import { postMessageToThread, @@ -378,7 +527,7 @@ const channel = new BroadcastChannel('sync'); const level = workerData?.level ?? 0; if (level < 10) { - const worker = new Worker(fileURLToPath(import.meta.url), { + const worker = new Worker(new URL(import.meta.url), { workerData: { level: level + 1 }, }); } @@ -402,6 +551,9 @@ channel.onmessage = channel.close; ``` ```cjs +'use strict'; + +const process = require('node:process'); const { postMessageToThread, threadId, @@ -455,7 +607,20 @@ Receive a single message from a given `MessagePort`. If no message is available, that contains the message payload, corresponding to the oldest message in the `MessagePort`'s queue. -```js +```mjs +import { MessageChannel, receiveMessageOnPort } from 'node:worker_threads'; +const { port1, port2 } = new MessageChannel(); +port1.postMessage({ hello: 'world' }); + +console.log(receiveMessageOnPort(port2)); +// Prints: { message: { hello: 'world' } } +console.log(receiveMessageOnPort(port2)); +// Prints: undefined +``` + +```cjs +'use strict'; + const { MessageChannel, receiveMessageOnPort } = require('node:worker_threads'); const { port1, port2 } = new MessageChannel(); port1.postMessage({ hello: 'world' }); @@ -501,7 +666,18 @@ A special value that can be passed as the `env` option of the [`Worker`][] constructor, to indicate that the current thread and the Worker thread should share read and write access to the same set of environment variables. -```js +```mjs +import process from 'node:process'; +import { Worker, SHARE_ENV } from 'node:worker_threads'; +new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV }) + .on('exit', () => { + console.log(process.env.SET_IN_WORKER); // Prints 'foo'. + }); +``` + +```cjs +'use strict'; + const { Worker, SHARE_ENV } = require('node:worker_threads'); new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV }) .on('exit', () => { @@ -557,7 +733,19 @@ to this thread's `Worker` constructor. The data is cloned as if using [`postMessage()`][`port.postMessage()`], according to the [HTML structured clone algorithm][]. -```js +```mjs +import { Worker, isMainThread, workerData } from 'node:worker_threads'; + +if (isMainThread) { + const worker = new Worker(new URL(import.meta.url), { workerData: 'Hello, world!' }); +} else { + console.log(workerData); // Prints 'Hello, world!'. +} +``` + +```cjs +'use strict'; + const { Worker, isMainThread, workerData } = require('node:worker_threads'); if (isMainThread) { @@ -580,7 +768,30 @@ changes: Instances of `BroadcastChannel` allow asynchronous one-to-many communication with all other `BroadcastChannel` instances bound to the same channel name. -```js +```mjs +import { + isMainThread, + BroadcastChannel, + Worker, +} from 'node:worker_threads'; + +const bc = new BroadcastChannel('hello'); + +if (isMainThread) { + let c = 0; + bc.onmessage = (event) => { + console.log(event.data); + if (++c === 10) bc.close(); + }; + for (let n = 0; n < 10; n++) + new Worker(new URL(import.meta.url)); +} else { + bc.postMessage('hello from every worker'); + bc.close(); +} +``` + +```cjs 'use strict'; const { @@ -681,7 +892,18 @@ The `MessageChannel` has no methods of its own. `new MessageChannel()` yields an object with `port1` and `port2` properties, which refer to linked [`MessagePort`][] instances. -```js +```mjs +import { MessageChannel } from 'node:worker_threads'; + +const { port1, port2 } = new MessageChannel(); +port1.on('message', (message) => console.log('received', message)); +port2.postMessage({ foo: 'bar' }); +// Prints: received { foo: 'bar' } from the `port1.on('message')` listener +``` + +```cjs +'use strict'; + const { MessageChannel } = require('node:worker_threads'); const { port1, port2 } = new MessageChannel(); @@ -720,7 +942,23 @@ added: v10.5.0 The `'close'` event is emitted once either side of the channel has been disconnected. -```js +```mjs +import { MessageChannel } from 'node:worker_threads'; +const { port1, port2 } = new MessageChannel(); + +// Prints: +// foobar +// closed! +port2.on('message', (message) => console.log(message)); +port2.on('close', () => console.log('closed!')); + +port1.postMessage('foobar'); +port1.close(); +``` + +```cjs +'use strict'; + const { MessageChannel } = require('node:worker_threads'); const { port1, port2 } = new MessageChannel(); @@ -841,7 +1079,21 @@ In particular, the significant differences to `JSON` are: * {net.SocketAddress}es, * {X509Certificate}s. -```js +```mjs +import { MessageChannel } from 'node:worker_threads'; +const { port1, port2 } = new MessageChannel(); + +port1.on('message', (message) => console.log(message)); + +const circularData = {}; +circularData.foo = circularData; +// Prints: { foo: [Circular] } +port2.postMessage(circularData); +``` + +```cjs +'use strict'; + const { MessageChannel } = require('node:worker_threads'); const { port1, port2 } = new MessageChannel(); @@ -866,7 +1118,33 @@ from either thread. They cannot be listed in `transferList`. `value` may still contain `ArrayBuffer` instances that are not in `transferList`; in that case, the underlying memory is copied rather than moved. -```js +```mjs +import { MessageChannel } from 'node:worker_threads'; +const { port1, port2 } = new MessageChannel(); + +port1.on('message', (message) => console.log(message)); + +const uint8Array = new Uint8Array([ 1, 2, 3, 4 ]); +// This posts a copy of `uint8Array`: +port2.postMessage(uint8Array); +// This does not copy data, but renders `uint8Array` unusable: +port2.postMessage(uint8Array, [ uint8Array.buffer ]); + +// The memory for the `sharedUint8Array` is accessible from both the +// original and the copy received by `.on('message')`: +const sharedUint8Array = new Uint8Array(new SharedArrayBuffer(4)); +port2.postMessage(sharedUint8Array); + +// This transfers a freshly created message port to the receiver. +// This can be used, for example, to create communication channels between +// multiple `Worker` threads that are children of the same parent thread. +const otherChannel = new MessageChannel(); +port2.postMessage({ port: otherChannel.port1 }, [ otherChannel.port1 ]); +``` + +```cjs +'use strict'; + const { MessageChannel } = require('node:worker_threads'); const { port1, port2 } = new MessageChannel(); @@ -1105,7 +1383,30 @@ See [`port.postMessage()`][] for more information on how messages are passed, and what kind of JavaScript values can be successfully transported through the thread barrier. -```js +```mjs +import assert from 'node:assert'; +import { + Worker, MessageChannel, MessagePort, isMainThread, parentPort, +} from 'node:worker_threads'; +if (isMainThread) { + const worker = new Worker(new URL(import.meta.url)); + const subChannel = new MessageChannel(); + worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]); + subChannel.port2.on('message', (value) => { + console.log('received:', value); + }); +} else { + parentPort.once('message', (value) => { + assert(value.hereIsYourPort instanceof MessagePort); + value.hereIsYourPort.postMessage('the worker is sending this'); + value.hereIsYourPort.close(); + }); +} +``` + +```cjs +'use strict'; + const assert = require('node:assert'); const { Worker, MessageChannel, MessagePort, isMainThread, parentPort, @@ -1391,7 +1692,29 @@ stuck in bootstrap. The following examples shows how the worker's entire lifetime never accumulates any `idle` time, but is still be able to process messages. -```js +```mjs +import { Worker, isMainThread, parentPort } from 'node:worker_threads'; + +if (isMainThread) { + const worker = new Worker(new URL(import.meta.url)); + setInterval(() => { + worker.postMessage('hi'); + console.log(worker.performance.eventLoopUtilization()); + }, 100).unref(); +} else { + parentPort.on('message', () => console.log('msg')).unref(); + (function r(n) { + if (--n < 0) return; + const t = Date.now(); + while (Date.now() - t < 300); + setImmediate(r, n); + })(10); +} +``` + +```cjs +'use strict'; + const { Worker, isMainThread, parentPort } = require('node:worker_threads'); if (isMainThread) { @@ -1400,16 +1723,15 @@ if (isMainThread) { worker.postMessage('hi'); console.log(worker.performance.eventLoopUtilization()); }, 100).unref(); - return; +} else { + parentPort.on('message', () => console.log('msg')).unref(); + (function r(n) { + if (--n < 0) return; + const t = Date.now(); + while (Date.now() - t < 300); + setImmediate(r, n); + })(10); } - -parentPort.on('message', () => console.log('msg')).unref(); -(function r(n) { - if (--n < 0) return; - const t = Date.now(); - while (Date.now() - t < 300); - setImmediate(r, n); -})(10); ``` The event loop utilization of a worker is available only after the [`'online'`