mirror of https://github.com/nodejs/node.git
worker: add ports property to MessageEvents
Add `ev.ports` for spec compliancy. Since we only emit the raw `data` value, and only create the `MessageEvent` instance if there are EventTarget-style listeners, we store the ports list temporarily on the MessagePort object itself, so that we can look it up when we need to create the event object. Fixes: https://github.com/nodejs/node/issues/37358 PR-URL: https://github.com/nodejs/node/pull/37538 Reviewed-By: Michaël Zasso <targos@protonmail.com> Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
This commit is contained in:
parent
9dd2f9f19d
commit
007a85ce83
|
@ -4,22 +4,30 @@ const {
|
||||||
} = primordials;
|
} = primordials;
|
||||||
|
|
||||||
class MessageEvent {
|
class MessageEvent {
|
||||||
constructor(data, target, type) {
|
constructor(data, target, type, ports) {
|
||||||
this.data = data;
|
this.data = data;
|
||||||
this.target = target;
|
this.target = target;
|
||||||
this.type = type;
|
this.type = type;
|
||||||
|
this.ports = ports ?? [];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');
|
const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');
|
||||||
|
const kCurrentlyReceivingPorts =
|
||||||
|
SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');
|
||||||
|
|
||||||
exports.emitMessage = function(data, type) {
|
exports.emitMessage = function(data, ports, type) {
|
||||||
if (typeof this[kHybridDispatch] === 'function') {
|
if (typeof this[kHybridDispatch] === 'function') {
|
||||||
this[kHybridDispatch](data, type, undefined);
|
this[kCurrentlyReceivingPorts] = ports;
|
||||||
|
try {
|
||||||
|
this[kHybridDispatch](data, type, undefined);
|
||||||
|
} finally {
|
||||||
|
this[kCurrentlyReceivingPorts] = undefined;
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const event = new MessageEvent(data, this, type);
|
const event = new MessageEvent(data, this, type, ports);
|
||||||
if (type === 'message') {
|
if (type === 'message') {
|
||||||
if (typeof this.onmessage === 'function')
|
if (typeof this.onmessage === 'function')
|
||||||
this.onmessage(event);
|
this.onmessage(event);
|
||||||
|
|
|
@ -15,6 +15,7 @@ const {
|
||||||
ObjectSetPrototypeOf,
|
ObjectSetPrototypeOf,
|
||||||
ReflectApply,
|
ReflectApply,
|
||||||
Symbol,
|
Symbol,
|
||||||
|
SymbolFor,
|
||||||
} = primordials;
|
} = primordials;
|
||||||
|
|
||||||
const {
|
const {
|
||||||
|
@ -70,6 +71,8 @@ const kWritableCallbacks = Symbol('kWritableCallbacks');
|
||||||
const kSource = Symbol('kSource');
|
const kSource = Symbol('kSource');
|
||||||
const kStartedReading = Symbol('kStartedReading');
|
const kStartedReading = Symbol('kStartedReading');
|
||||||
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
|
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
|
||||||
|
const kCurrentlyReceivingPorts =
|
||||||
|
SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');
|
||||||
|
|
||||||
const messageTypes = {
|
const messageTypes = {
|
||||||
UP_AND_RUNNING: 'upAndRunning',
|
UP_AND_RUNNING: 'upAndRunning',
|
||||||
|
@ -150,7 +153,9 @@ ObjectDefineProperty(
|
||||||
if (type !== 'message' && type !== 'messageerror') {
|
if (type !== 'message' && type !== 'messageerror') {
|
||||||
return ReflectApply(originalCreateEvent, this, arguments);
|
return ReflectApply(originalCreateEvent, this, arguments);
|
||||||
}
|
}
|
||||||
return new MessageEvent(type, { data });
|
const ports = this[kCurrentlyReceivingPorts];
|
||||||
|
this[kCurrentlyReceivingPorts] = undefined;
|
||||||
|
return new MessageEvent(type, { data, ports });
|
||||||
},
|
},
|
||||||
configurable: false,
|
configurable: false,
|
||||||
writable: false,
|
writable: false,
|
||||||
|
@ -161,6 +166,7 @@ ObjectDefineProperty(
|
||||||
function oninit() {
|
function oninit() {
|
||||||
initNodeEventTarget(this);
|
initNodeEventTarget(this);
|
||||||
setupPortReferencing(this, this, 'message');
|
setupPortReferencing(this, this, 'message');
|
||||||
|
this[kCurrentlyReceivingPorts] = undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
defineEventHandler(MessagePort.prototype, 'message');
|
defineEventHandler(MessagePort.prototype, 'message');
|
||||||
|
|
|
@ -126,11 +126,18 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
|
||||||
} // anonymous namespace
|
} // anonymous namespace
|
||||||
|
|
||||||
MaybeLocal<Value> Message::Deserialize(Environment* env,
|
MaybeLocal<Value> Message::Deserialize(Environment* env,
|
||||||
Local<Context> context) {
|
Local<Context> context,
|
||||||
|
Local<Value>* port_list) {
|
||||||
|
Context::Scope context_scope(context);
|
||||||
|
|
||||||
CHECK(!IsCloseMessage());
|
CHECK(!IsCloseMessage());
|
||||||
|
if (port_list != nullptr && !transferables_.empty()) {
|
||||||
|
// Need to create this outside of the EscapableHandleScope, but inside
|
||||||
|
// the Context::Scope.
|
||||||
|
*port_list = Array::New(env->isolate());
|
||||||
|
}
|
||||||
|
|
||||||
EscapableHandleScope handle_scope(env->isolate());
|
EscapableHandleScope handle_scope(env->isolate());
|
||||||
Context::Scope context_scope(context);
|
|
||||||
|
|
||||||
// Create all necessary objects for transferables, e.g. MessagePort handles.
|
// Create all necessary objects for transferables, e.g. MessagePort handles.
|
||||||
std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
|
std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
|
||||||
|
@ -146,10 +153,27 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
|
||||||
});
|
});
|
||||||
|
|
||||||
for (uint32_t i = 0; i < transferables_.size(); ++i) {
|
for (uint32_t i = 0; i < transferables_.size(); ++i) {
|
||||||
|
HandleScope handle_scope(env->isolate());
|
||||||
TransferData* data = transferables_[i].get();
|
TransferData* data = transferables_[i].get();
|
||||||
host_objects[i] = data->Deserialize(
|
host_objects[i] = data->Deserialize(
|
||||||
env, context, std::move(transferables_[i]));
|
env, context, std::move(transferables_[i]));
|
||||||
if (!host_objects[i]) return {};
|
if (!host_objects[i]) return {};
|
||||||
|
if (port_list != nullptr) {
|
||||||
|
// If we gather a list of all message ports, and this transferred object
|
||||||
|
// is a message port, add it to that list. This is a bit of an odd case
|
||||||
|
// of special handling for MessagePorts (as opposed to applying to all
|
||||||
|
// transferables), but it's required for spec compliancy.
|
||||||
|
DCHECK((*port_list)->IsArray());
|
||||||
|
Local<Array> port_list_array = port_list->As<Array>();
|
||||||
|
Local<Object> obj = host_objects[i]->object();
|
||||||
|
if (env->message_port_constructor_template()->HasInstance(obj)) {
|
||||||
|
if (port_list_array->Set(context,
|
||||||
|
port_list_array->Length(),
|
||||||
|
obj).IsNothing()) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
transferables_.clear();
|
transferables_.clear();
|
||||||
|
|
||||||
|
@ -664,7 +688,8 @@ MessagePort* MessagePort::New(
|
||||||
}
|
}
|
||||||
|
|
||||||
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
|
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
|
||||||
MessageProcessingMode mode) {
|
MessageProcessingMode mode,
|
||||||
|
Local<Value>* port_list) {
|
||||||
std::shared_ptr<Message> received;
|
std::shared_ptr<Message> received;
|
||||||
{
|
{
|
||||||
// Get the head of the message queue.
|
// Get the head of the message queue.
|
||||||
|
@ -696,7 +721,7 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
|
||||||
|
|
||||||
if (!env()->can_call_into_js()) return MaybeLocal<Value>();
|
if (!env()->can_call_into_js()) return MaybeLocal<Value>();
|
||||||
|
|
||||||
return received->Deserialize(env(), context);
|
return received->Deserialize(env(), context, port_list);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MessagePort::OnMessage(MessageProcessingMode mode) {
|
void MessagePort::OnMessage(MessageProcessingMode mode) {
|
||||||
|
@ -735,14 +760,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) {
|
||||||
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
|
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
|
||||||
|
|
||||||
Local<Value> payload;
|
Local<Value> payload;
|
||||||
|
Local<Value> port_list = Undefined(env()->isolate());
|
||||||
Local<Value> message_error;
|
Local<Value> message_error;
|
||||||
Local<Value> argv[2];
|
Local<Value> argv[3];
|
||||||
|
|
||||||
{
|
{
|
||||||
// Catch any exceptions from parsing the message itself (not from
|
// Catch any exceptions from parsing the message itself (not from
|
||||||
// emitting it) as 'messageeror' events.
|
// emitting it) as 'messageeror' events.
|
||||||
TryCatchScope try_catch(env());
|
TryCatchScope try_catch(env());
|
||||||
if (!ReceiveMessage(context, mode).ToLocal(&payload)) {
|
if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
|
||||||
if (try_catch.HasCaught() && !try_catch.HasTerminated())
|
if (try_catch.HasCaught() && !try_catch.HasTerminated())
|
||||||
message_error = try_catch.Exception();
|
message_error = try_catch.Exception();
|
||||||
goto reschedule;
|
goto reschedule;
|
||||||
|
@ -757,13 +783,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
argv[0] = payload;
|
argv[0] = payload;
|
||||||
argv[1] = env()->message_string();
|
argv[1] = port_list;
|
||||||
|
argv[2] = env()->message_string();
|
||||||
|
|
||||||
if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
|
if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
|
||||||
reschedule:
|
reschedule:
|
||||||
if (!message_error.IsEmpty()) {
|
if (!message_error.IsEmpty()) {
|
||||||
argv[0] = message_error;
|
argv[0] = message_error;
|
||||||
argv[1] = env()->messageerror_string();
|
argv[1] = Undefined(env()->isolate());
|
||||||
|
argv[2] = env()->messageerror_string();
|
||||||
USE(MakeCallback(emit_message, arraysize(argv), argv));
|
USE(MakeCallback(emit_message, arraysize(argv), argv));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,8 +62,10 @@ class Message : public MemoryRetainer {
|
||||||
|
|
||||||
// Deserialize the contained JS value. May only be called once, and only
|
// Deserialize the contained JS value. May only be called once, and only
|
||||||
// after Serialize() has been called (e.g. by another thread).
|
// after Serialize() has been called (e.g. by another thread).
|
||||||
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
|
v8::MaybeLocal<v8::Value> Deserialize(
|
||||||
v8::Local<v8::Context> context);
|
Environment* env,
|
||||||
|
v8::Local<v8::Context> context,
|
||||||
|
v8::Local<v8::Value>* port_list = nullptr);
|
||||||
|
|
||||||
// Serialize a JS value, and optionally transfer objects, into this message.
|
// Serialize a JS value, and optionally transfer objects, into this message.
|
||||||
// The Message object retains ownership of all transferred objects until
|
// The Message object retains ownership of all transferred objects until
|
||||||
|
@ -293,8 +295,10 @@ class MessagePort : public HandleWrap {
|
||||||
void OnClose() override;
|
void OnClose() override;
|
||||||
void OnMessage(MessageProcessingMode mode);
|
void OnMessage(MessageProcessingMode mode);
|
||||||
void TriggerAsync();
|
void TriggerAsync();
|
||||||
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
|
v8::MaybeLocal<v8::Value> ReceiveMessage(
|
||||||
MessageProcessingMode mode);
|
v8::Local<v8::Context> context,
|
||||||
|
MessageProcessingMode mode,
|
||||||
|
v8::Local<v8::Value>* port_list = nullptr);
|
||||||
|
|
||||||
std::unique_ptr<MessagePortData> data_ = nullptr;
|
std::unique_ptr<MessagePortData> data_ = nullptr;
|
||||||
bool receiving_messages_ = false;
|
bool receiving_messages_ = false;
|
||||||
|
|
|
@ -34,9 +34,13 @@ vm.runInContext('(' + function() {
|
||||||
|
|
||||||
assert(!(port instanceof MessagePort));
|
assert(!(port instanceof MessagePort));
|
||||||
assert.strictEqual(port.onmessage, undefined);
|
assert.strictEqual(port.onmessage, undefined);
|
||||||
port.onmessage = function({ data }) {
|
port.onmessage = function({ data, ports }) {
|
||||||
assert(data instanceof Object);
|
assert(data instanceof Object);
|
||||||
port.postMessage(data);
|
assert(ports instanceof Array);
|
||||||
|
assert.strictEqual(ports.length, 1);
|
||||||
|
assert.strictEqual(ports[0], data.p);
|
||||||
|
assert(!(data.p instanceof MessagePort));
|
||||||
|
port.postMessage({});
|
||||||
};
|
};
|
||||||
port.start();
|
port.start();
|
||||||
}
|
}
|
||||||
|
@ -55,8 +59,10 @@ vm.runInContext('(' + function() {
|
||||||
}
|
}
|
||||||
} + ')()', context);
|
} + ')()', context);
|
||||||
|
|
||||||
|
const otherChannel = new MessageChannel();
|
||||||
port2.on('message', common.mustCall((msg) => {
|
port2.on('message', common.mustCall((msg) => {
|
||||||
assert(msg instanceof Object);
|
assert(msg instanceof Object);
|
||||||
port2.close();
|
port2.close();
|
||||||
|
otherChannel.port2.close();
|
||||||
}));
|
}));
|
||||||
port2.postMessage({});
|
port2.postMessage({ p: otherChannel.port1 }, [ otherChannel.port1 ]);
|
||||||
|
|
|
@ -34,6 +34,7 @@ const { MessageChannel, MessagePort } = require('worker_threads');
|
||||||
port1.onmessage = common.mustCall((message) => {
|
port1.onmessage = common.mustCall((message) => {
|
||||||
assert.strictEqual(message.data, 4);
|
assert.strictEqual(message.data, 4);
|
||||||
assert.strictEqual(message.target, port1);
|
assert.strictEqual(message.target, port1);
|
||||||
|
assert.deepStrictEqual(message.ports, []);
|
||||||
port2.close(common.mustCall());
|
port2.close(common.mustCall());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -161,6 +162,19 @@ const { MessageChannel, MessagePort } = require('worker_threads');
|
||||||
port1.close();
|
port1.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// Test MessageEvent#ports
|
||||||
|
const c1 = new MessageChannel();
|
||||||
|
const c2 = new MessageChannel();
|
||||||
|
c1.port1.postMessage({ port: c2.port2 }, [ c2.port2 ]);
|
||||||
|
c1.port2.addEventListener('message', common.mustCall((ev) => {
|
||||||
|
assert.strictEqual(ev.ports.length, 1);
|
||||||
|
assert.strictEqual(ev.ports[0].constructor, MessagePort);
|
||||||
|
c1.port1.close();
|
||||||
|
c2.port1.close();
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
assert.deepStrictEqual(
|
assert.deepStrictEqual(
|
||||||
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
|
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
|
||||||
|
|
Loading…
Reference in New Issue