feat: allow ensureDelivery to be able to ensure delivery on emit
Signed-off-by: Remi Cattiau <remi@cattiau.com>
This commit is contained in:
parent
d418a50c56
commit
43d9e01972
|
@ -107,7 +107,7 @@ function sendWithAxios(message) {
|
||||||
|
|
||||||
const emit = emitterFor(sendWithAxios, { mode: Mode.BINARY });
|
const emit = emitterFor(sendWithAxios, { mode: Mode.BINARY });
|
||||||
// Set the emit
|
// Set the emit
|
||||||
Emitter.getSingleton().on("event", emit);
|
Emitter.on("cloudevent", emit);
|
||||||
|
|
||||||
...
|
...
|
||||||
// In any part of the code will send the event
|
// In any part of the code will send the event
|
||||||
|
|
|
@ -171,10 +171,11 @@ export class CloudEvent implements CloudEventV1, CloudEventV03 {
|
||||||
/**
|
/**
|
||||||
* Emit this CloudEvent through the application
|
* Emit this CloudEvent through the application
|
||||||
*
|
*
|
||||||
* @return {CloudEvent} current CloudEvent object
|
* @param {boolean} ensureDelivery fail the promise if one listener fail
|
||||||
|
* @return {Promise<CloudEvent>} this
|
||||||
*/
|
*/
|
||||||
public emit(): this {
|
public async emit(ensureDelivery = true): Promise<this> {
|
||||||
Emitter.emitEvent(this);
|
await Emitter.emitEvent(this, ensureDelivery);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ import { CloudEvent, Version } from "./event/cloudevent";
|
||||||
import { ValidationError } from "./event/validation";
|
import { ValidationError } from "./event/validation";
|
||||||
import { CloudEventV03, CloudEventV03Attributes, CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
|
import { CloudEventV03, CloudEventV03Attributes, CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
|
||||||
|
|
||||||
import { Options, TransportFunction, EmitterFunction, emitterFor } from "./transport/emitter";
|
import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
|
||||||
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer } from "./message";
|
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer } from "./message";
|
||||||
|
|
||||||
import CONSTANTS from "./constants";
|
import CONSTANTS from "./constants";
|
||||||
|
@ -28,6 +28,7 @@ export {
|
||||||
TransportFunction,
|
TransportFunction,
|
||||||
EmitterFunction,
|
EmitterFunction,
|
||||||
emitterFor,
|
emitterFor,
|
||||||
|
Emitter,
|
||||||
Options,
|
Options,
|
||||||
// From Constants
|
// From Constants
|
||||||
CONSTANTS,
|
CONSTANTS,
|
||||||
|
|
|
@ -67,7 +67,7 @@ export class Emitter extends EventEmitter {
|
||||||
/**
|
/**
|
||||||
* Singleton store
|
* Singleton store
|
||||||
*/
|
*/
|
||||||
static singleton: Emitter | undefined = undefined;
|
static instance: Emitter | undefined = undefined;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an Emitter
|
* Create an Emitter
|
||||||
|
@ -83,20 +83,42 @@ export class Emitter extends EventEmitter {
|
||||||
*
|
*
|
||||||
* @return {Emitter} return Emitter singleton
|
* @return {Emitter} return Emitter singleton
|
||||||
*/
|
*/
|
||||||
static getSingleton(): Emitter {
|
static getInstance(): Emitter {
|
||||||
if (!Emitter.singleton) {
|
if (!Emitter.instance) {
|
||||||
Emitter.singleton = new Emitter();
|
Emitter.instance = new Emitter();
|
||||||
}
|
}
|
||||||
return Emitter.singleton;
|
return Emitter.instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a listener for eventing
|
||||||
|
*
|
||||||
|
* @param {string} event type to listen to
|
||||||
|
* @param {Function} listener to call on event
|
||||||
|
* @return {void}
|
||||||
|
*/
|
||||||
|
static on(event: "cloudevent" | "newListener" | "removeListener", listener: (...args: any[]) => void): void {
|
||||||
|
this.getInstance().on(event, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Emit an event inside this application
|
* Emit an event inside this application
|
||||||
*
|
*
|
||||||
* @param {CloudEvent} event to emit
|
* @param {CloudEvent} event to emit
|
||||||
|
* @param {boolean} ensureDelivery fail the promise if one listener fail
|
||||||
* @return {void}
|
* @return {void}
|
||||||
*/
|
*/
|
||||||
static emitEvent(event: CloudEvent): void {
|
static async emitEvent(event: CloudEvent, ensureDelivery = true): Promise<void> {
|
||||||
this.getSingleton().emit("event", event);
|
if (!ensureDelivery) {
|
||||||
|
// Ensure delivery is disabled so we don't wait for Promise
|
||||||
|
Emitter.getInstance().emit("cloudevent", event);
|
||||||
|
} else {
|
||||||
|
// Execute all listeners and wrap them in a Promise
|
||||||
|
await Promise.all(
|
||||||
|
Emitter.getInstance()
|
||||||
|
.listeners("cloudevent")
|
||||||
|
.map(async (l) => l(event)),
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
import "mocha";
|
||||||
|
|
||||||
|
import { emitterFor, HTTP, Mode, Message, Emitter } from "../../src";
|
||||||
|
|
||||||
|
import { fixture, assertStructured } from "./emitter_factory_test";
|
||||||
|
|
||||||
|
import { rejects, doesNotReject } from "assert";
|
||||||
|
|
||||||
|
describe("Emitter Singleton", () => {
|
||||||
|
it("emit a Node.js 'cloudevent' event as an EventEmitter", async () => {
|
||||||
|
const msg: Message | unknown = await new Promise((resolve) => {
|
||||||
|
const fn = async (message: Message) => {
|
||||||
|
resolve(message);
|
||||||
|
};
|
||||||
|
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
|
||||||
|
Emitter.on("cloudevent", emitter);
|
||||||
|
|
||||||
|
fixture.emit(false);
|
||||||
|
});
|
||||||
|
let body: unknown = (<Message>(<unknown>msg)).body;
|
||||||
|
if (typeof body === "string") {
|
||||||
|
body = JSON.parse(body);
|
||||||
|
}
|
||||||
|
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("emit a Node.js 'cloudevent' event as an EventEmitter with ensureDelivery", async () => {
|
||||||
|
let msg: Message | unknown = undefined;
|
||||||
|
const fn = async (message: Message) => {
|
||||||
|
msg = message;
|
||||||
|
};
|
||||||
|
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
|
||||||
|
Emitter.on("cloudevent", emitter);
|
||||||
|
await fixture.emit(true);
|
||||||
|
let body: any = (<Message>msg).body;
|
||||||
|
if (typeof body === "string") {
|
||||||
|
body = JSON.parse(body);
|
||||||
|
}
|
||||||
|
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("emit a Node.js 'cloudevent' event as an EventEmitter with ensureDelivery Error", async () => {
|
||||||
|
const emitter = async () => {
|
||||||
|
throw new Error("Not sent");
|
||||||
|
};
|
||||||
|
Emitter.on("cloudevent", emitter);
|
||||||
|
// Should fail with emitWithEnsureDelivery
|
||||||
|
await rejects(() => fixture.emit(true));
|
||||||
|
// Should not fail with emitWithEnsureDelivery
|
||||||
|
// Work locally but not on Github Actions
|
||||||
|
if (!process.env.GITHUB_WORKFLOW) {
|
||||||
|
await doesNotReject(() => fixture.emit(false));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
Loading…
Reference in New Issue