diff --git a/src/message/index.ts b/src/message/index.ts index bf1dd5c..4eda732 100644 --- a/src/message/index.ts +++ b/src/message/index.ts @@ -22,9 +22,9 @@ export * from "./mqtt"; * @property {@link Deserializer} `toEvent` - converts a Message into a CloudEvent * @property {@link Detector} `isEvent` - determines if a Message can be converted to a CloudEvent */ -export interface Binding { - binary: Serializer; - structured: Serializer; +export interface Binding { + binary: Serializer; + structured: Serializer; toEvent: Deserializer; isEvent: Detector; } @@ -65,8 +65,8 @@ export enum Mode { * CloudEvent into a Message. * @interface */ -export interface Serializer { - (event: CloudEventV1): Message; +export interface Serializer { + (event: CloudEventV1): M; } /** diff --git a/src/message/kafka/index.ts b/src/message/kafka/index.ts index 85f3eec..83ade52 100644 --- a/src/message/kafka/index.ts +++ b/src/message/kafka/index.ts @@ -22,7 +22,7 @@ export type { * Bindings for Kafka transport * @implements {@linkcode Binding} */ - const Kafka: Binding = { + const Kafka: Binding, KafkaMessage> = { binary: toBinaryKafkaMessage, structured: toStructuredKafkaMessage, toEvent: deserializeKafkaMessage, @@ -35,9 +35,9 @@ type Key = string | Buffer; * Extends the base Message type to include * Kafka-specific fields */ -interface KafkaMessage extends Message { +interface KafkaMessage extends Message { key: Key - value: T | string | Buffer | unknown + value: T timestamp?: string } @@ -61,7 +61,7 @@ interface KafkaEvent extends CloudEventV1 { * @param {KafkaEvent} event The event to serialize * @returns {KafkaMessage} a KafkaMessage instance */ -function toBinaryKafkaMessage(event: CloudEventV1): KafkaMessage { +function toBinaryKafkaMessage(event: CloudEventV1): KafkaMessage { // 3.2.1. Content Type // For the binary mode, the header content-type property MUST be mapped directly // to the CloudEvents datacontenttype attribute. @@ -86,7 +86,7 @@ function toBinaryKafkaMessage(event: CloudEventV1): KafkaMessage { * @param {CloudEvent} event the CloudEvent to be serialized * @returns {KafkaMessage} a KafkaMessage instance */ - function toStructuredKafkaMessage(event: CloudEventV1): KafkaMessage { + function toStructuredKafkaMessage(event: CloudEventV1): KafkaMessage { if ((event instanceof CloudEvent) && event.data_base64) { // The event's data is binary - delete it event = event.cloneWith({ data: undefined }); @@ -130,9 +130,9 @@ function deserializeKafkaMessage(message: Message): CloudEvent | CloudEven case Mode.BINARY: return parseBinary(m); case Mode.STRUCTURED: - return parseStructured(m); + return parseStructured(m as unknown as KafkaMessage); case Mode.BATCH: - return parseBatched(m); + return parseBatched(m as unknown as KafkaMessage); default: throw new ValidationError("Unknown Message mode"); } @@ -212,14 +212,14 @@ function parseBinary(message: KafkaMessage): CloudEvent { * @param {KafkaMessage} message the message * @returns {CloudEvent} a KafkaEvent */ -function parseStructured(message: KafkaMessage): CloudEvent { +function parseStructured(message: KafkaMessage): CloudEvent { // Although the format of a structured encoded event could be something // other than JSON, e.g. XML, we currently only support JSON // encoded structured events. if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_JSON)) { throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`); } - const eventObj = JSON.parse(message.value as string); + const eventObj = JSON.parse(message.value); eventObj.time = new Date(eventObj.time).toISOString(); return new CloudEvent({ ...eventObj, @@ -232,14 +232,14 @@ function parseStructured(message: KafkaMessage): CloudEvent { * @param {KafkaMessage} message the message * @returns {CloudEvent[]} an array of KafkaEvent */ -function parseBatched(message: KafkaMessage): CloudEvent[] { +function parseBatched(message: KafkaMessage): CloudEvent[] { // Although the format of batch encoded events could be something // other than JSON, e.g. XML, we currently only support JSON // encoded structured events. if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_BATCH)) { throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`); } - const events = JSON.parse(message.value as string) as Record[]; + const events = JSON.parse(message.value) as Record[]; return events.map((e) => new CloudEvent({ ...e, partitionkey: message.key }, false)); } diff --git a/src/message/mqtt/index.ts b/src/message/mqtt/index.ts index 3eee93f..f576c56 100644 --- a/src/message/mqtt/index.ts +++ b/src/message/mqtt/index.ts @@ -15,7 +15,7 @@ export type { MQTTMessage }; * Extends the base {@linkcode Message} interface to include MQTT attributes, some of which * are aliases of the {Message} attributes. */ -interface MQTTMessage extends Message { +interface MQTTMessage extends Message { /** * Identifies this message as a PUBLISH packet. MQTTMessages created with * the `binary` and `structured` Serializers will contain a "Content Type" @@ -37,7 +37,7 @@ interface MQTTMessage extends Message { * Binding for MQTT transport support * @implements @linkcode Binding */ -const MQTT: Binding = { +const MQTT: Binding = { binary, structured, toEvent: toEvent as Deserializer,