fix: add generics to `Binding` type
Signed-off-by: Alex Tennant <atennant@skedulo.com> Fixes: https://github.com/cloudevents/sdk-javascript/issues/487 Signed-off-by: Alex Tennant <atennant@skedulo.com>
This commit is contained in:
parent
8357719bab
commit
5dc14b248e
|
@ -22,9 +22,9 @@ export * from "./mqtt";
|
||||||
* @property {@link Deserializer} `toEvent` - converts a Message into a CloudEvent
|
* @property {@link Deserializer} `toEvent` - converts a Message into a CloudEvent
|
||||||
* @property {@link Detector} `isEvent` - determines if a Message can be converted to a CloudEvent
|
* @property {@link Detector} `isEvent` - determines if a Message can be converted to a CloudEvent
|
||||||
*/
|
*/
|
||||||
export interface Binding {
|
export interface Binding<B extends Message = Message, S extends Message = Message> {
|
||||||
binary: Serializer;
|
binary: Serializer<B>;
|
||||||
structured: Serializer;
|
structured: Serializer<S>;
|
||||||
toEvent: Deserializer;
|
toEvent: Deserializer;
|
||||||
isEvent: Detector;
|
isEvent: Detector;
|
||||||
}
|
}
|
||||||
|
@ -65,8 +65,8 @@ export enum Mode {
|
||||||
* CloudEvent into a Message.
|
* CloudEvent into a Message.
|
||||||
* @interface
|
* @interface
|
||||||
*/
|
*/
|
||||||
export interface Serializer {
|
export interface Serializer<M extends Message> {
|
||||||
<T>(event: CloudEventV1<T>): Message;
|
<T>(event: CloudEventV1<T>): M;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -22,7 +22,7 @@ export type {
|
||||||
* Bindings for Kafka transport
|
* Bindings for Kafka transport
|
||||||
* @implements {@linkcode Binding}
|
* @implements {@linkcode Binding}
|
||||||
*/
|
*/
|
||||||
const Kafka: Binding = {
|
const Kafka: Binding<KafkaMessage<unknown>, KafkaMessage<string>> = {
|
||||||
binary: toBinaryKafkaMessage,
|
binary: toBinaryKafkaMessage,
|
||||||
structured: toStructuredKafkaMessage,
|
structured: toStructuredKafkaMessage,
|
||||||
toEvent: deserializeKafkaMessage,
|
toEvent: deserializeKafkaMessage,
|
||||||
|
@ -35,9 +35,9 @@ type Key = string | Buffer;
|
||||||
* Extends the base Message type to include
|
* Extends the base Message type to include
|
||||||
* Kafka-specific fields
|
* Kafka-specific fields
|
||||||
*/
|
*/
|
||||||
interface KafkaMessage<T = string> extends Message {
|
interface KafkaMessage<T = string | Buffer | unknown> extends Message {
|
||||||
key: Key
|
key: Key
|
||||||
value: T | string | Buffer | unknown
|
value: T
|
||||||
timestamp?: string
|
timestamp?: string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,7 +61,7 @@ interface KafkaEvent<T> extends CloudEventV1<T> {
|
||||||
* @param {KafkaEvent<T>} event The event to serialize
|
* @param {KafkaEvent<T>} event The event to serialize
|
||||||
* @returns {KafkaMessage<T>} a KafkaMessage instance
|
* @returns {KafkaMessage<T>} a KafkaMessage instance
|
||||||
*/
|
*/
|
||||||
function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
|
function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T | undefined> {
|
||||||
// 3.2.1. Content Type
|
// 3.2.1. Content Type
|
||||||
// For the binary mode, the header content-type property MUST be mapped directly
|
// For the binary mode, the header content-type property MUST be mapped directly
|
||||||
// to the CloudEvents datacontenttype attribute.
|
// to the CloudEvents datacontenttype attribute.
|
||||||
|
@ -86,7 +86,7 @@ function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
|
||||||
* @param {CloudEvent<T>} event the CloudEvent to be serialized
|
* @param {CloudEvent<T>} event the CloudEvent to be serialized
|
||||||
* @returns {KafkaMessage<T>} a KafkaMessage instance
|
* @returns {KafkaMessage<T>} a KafkaMessage instance
|
||||||
*/
|
*/
|
||||||
function toStructuredKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
|
function toStructuredKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<string> {
|
||||||
if ((event instanceof CloudEvent) && event.data_base64) {
|
if ((event instanceof CloudEvent) && event.data_base64) {
|
||||||
// The event's data is binary - delete it
|
// The event's data is binary - delete it
|
||||||
event = event.cloneWith({ data: undefined });
|
event = event.cloneWith({ data: undefined });
|
||||||
|
@ -130,9 +130,9 @@ function deserializeKafkaMessage<T>(message: Message): CloudEvent<T> | CloudEven
|
||||||
case Mode.BINARY:
|
case Mode.BINARY:
|
||||||
return parseBinary(m);
|
return parseBinary(m);
|
||||||
case Mode.STRUCTURED:
|
case Mode.STRUCTURED:
|
||||||
return parseStructured(m);
|
return parseStructured(m as unknown as KafkaMessage<string>);
|
||||||
case Mode.BATCH:
|
case Mode.BATCH:
|
||||||
return parseBatched(m);
|
return parseBatched(m as unknown as KafkaMessage<string>);
|
||||||
default:
|
default:
|
||||||
throw new ValidationError("Unknown Message mode");
|
throw new ValidationError("Unknown Message mode");
|
||||||
}
|
}
|
||||||
|
@ -212,14 +212,14 @@ function parseBinary<T>(message: KafkaMessage<T>): CloudEvent<T> {
|
||||||
* @param {KafkaMessage<T>} message the message
|
* @param {KafkaMessage<T>} message the message
|
||||||
* @returns {CloudEvent<T>} a KafkaEvent<T>
|
* @returns {CloudEvent<T>} a KafkaEvent<T>
|
||||||
*/
|
*/
|
||||||
function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
|
function parseStructured<T>(message: KafkaMessage<string>): CloudEvent<T> {
|
||||||
// Although the format of a structured encoded event could be something
|
// Although the format of a structured encoded event could be something
|
||||||
// other than JSON, e.g. XML, we currently only support JSON
|
// other than JSON, e.g. XML, we currently only support JSON
|
||||||
// encoded structured events.
|
// encoded structured events.
|
||||||
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_JSON)) {
|
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_JSON)) {
|
||||||
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
|
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
|
||||||
}
|
}
|
||||||
const eventObj = JSON.parse(message.value as string);
|
const eventObj = JSON.parse(message.value);
|
||||||
eventObj.time = new Date(eventObj.time).toISOString();
|
eventObj.time = new Date(eventObj.time).toISOString();
|
||||||
return new CloudEvent({
|
return new CloudEvent({
|
||||||
...eventObj,
|
...eventObj,
|
||||||
|
@ -232,14 +232,14 @@ function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
|
||||||
* @param {KafkaMessage<T>} message the message
|
* @param {KafkaMessage<T>} message the message
|
||||||
* @returns {CloudEvent<T>[]} an array of KafkaEvent<T>
|
* @returns {CloudEvent<T>[]} an array of KafkaEvent<T>
|
||||||
*/
|
*/
|
||||||
function parseBatched<T>(message: KafkaMessage<T>): CloudEvent<T>[] {
|
function parseBatched<T>(message: KafkaMessage<string>): CloudEvent<T>[] {
|
||||||
// Although the format of batch encoded events could be something
|
// Although the format of batch encoded events could be something
|
||||||
// other than JSON, e.g. XML, we currently only support JSON
|
// other than JSON, e.g. XML, we currently only support JSON
|
||||||
// encoded structured events.
|
// encoded structured events.
|
||||||
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_BATCH)) {
|
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_BATCH)) {
|
||||||
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
|
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
|
||||||
}
|
}
|
||||||
const events = JSON.parse(message.value as string) as Record<string, unknown>[];
|
const events = JSON.parse(message.value) as Record<string, unknown>[];
|
||||||
return events.map((e) => new CloudEvent({ ...e, partitionkey: message.key }, false));
|
return events.map((e) => new CloudEvent({ ...e, partitionkey: message.key }, false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ export type { MQTTMessage };
|
||||||
* Extends the base {@linkcode Message} interface to include MQTT attributes, some of which
|
* Extends the base {@linkcode Message} interface to include MQTT attributes, some of which
|
||||||
* are aliases of the {Message} attributes.
|
* are aliases of the {Message} attributes.
|
||||||
*/
|
*/
|
||||||
interface MQTTMessage<T> extends Message<T> {
|
interface MQTTMessage<T = unknown> extends Message<T> {
|
||||||
/**
|
/**
|
||||||
* Identifies this message as a PUBLISH packet. MQTTMessages created with
|
* Identifies this message as a PUBLISH packet. MQTTMessages created with
|
||||||
* the `binary` and `structured` Serializers will contain a "Content Type"
|
* the `binary` and `structured` Serializers will contain a "Content Type"
|
||||||
|
@ -37,7 +37,7 @@ interface MQTTMessage<T> extends Message<T> {
|
||||||
* Binding for MQTT transport support
|
* Binding for MQTT transport support
|
||||||
* @implements @linkcode Binding
|
* @implements @linkcode Binding
|
||||||
*/
|
*/
|
||||||
const MQTT: Binding = {
|
const MQTT: Binding<MQTTMessage, MQTTMessage> = {
|
||||||
binary,
|
binary,
|
||||||
structured,
|
structured,
|
||||||
toEvent: toEvent as Deserializer,
|
toEvent: toEvent as Deserializer,
|
||||||
|
|
Loading…
Reference in New Issue