feat: add support for kafka transport (#455)

This commit extends the `message` package to include Kafka transport.

Additionally, some of the type information has changed across the project
to more accurately reflect the type of `Message` (by including `T`).

Related: https://github.com/cloudevents/sdk-javascript/issues/390
Signed-off-by: Lance Ball <lball@redhat.com>
This commit is contained in:
Lance Ball 2022-01-07 16:14:09 -05:00 committed by GitHub
parent 2ac731eb88
commit 5d1f744f50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 762 additions and 8909 deletions

View File

@ -162,7 +162,7 @@ There you will find Express.js, TypeScript and Websocket examples.
| ---------------------- | ----------------------------------------------------- | ----------------------------------------------------- |
| AMQP Protocol Binding | :x: | :x: |
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Protocol Binding | :x: | :x: |
| Kafka Protocol Binding | :x: | :heavy_check_mark: |
| MQTT Protocol Binding | :x: | :x: |
| NATS Protocol Binding | :x: | :x: |
@ -173,6 +173,9 @@ There you will find Express.js, TypeScript and Websocket examples.
| HTTP Binary | :heavy_check_mark: | :heavy_check_mark: |
| HTTP Structured | :heavy_check_mark: | :heavy_check_mark: |
| HTTP Batch | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Binary | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Structured | :heavy_check_mark: | :heavy_check_mark: |
| Kafka Batch | :heavy_check_mark: | :heavy_check_mark:
## Community
- There are bi-weekly calls immediately following the [Serverless/CloudEvents

8915
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,7 @@
"lint:fix": "eslint 'src/**/*.{js,ts}' 'test/**/*.{js,ts}' --fix",
"pretest": "npm run lint && npm run conformance",
"test": "mocha --require ts-node/register ./test/integration/**/*.ts",
"conformance": "cucumber-js ./conformance/features/http-protocol-binding.feature -p default",
"conformance": "cucumber-js ./conformance/features/*-protocol-binding.feature -p default",
"coverage": "nyc --reporter=lcov --reporter=text npm run test",
"coverage-publish": "wget -qO - https://coverage.codacy.com/get.sh | bash -s report -l JavaScript -r coverage/lcov.info",
"generate-docs": "typedoc --excludeNotDocumented --out docs src",

View File

@ -8,7 +8,9 @@ import { ValidationError } from "./event/validation";
import { CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
import { Headers, Mode, Binding, HTTP, Message, Serializer, Deserializer } from "./message";
import {
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message,
Serializer, Deserializer } from "./message";
import CONSTANTS from "./constants";
@ -27,6 +29,9 @@ export {
Deserializer,
Serializer,
HTTP,
Kafka,
KafkaEvent,
KafkaMessage,
// From transport
TransportFunction,
EmitterFunction,

View File

@ -6,8 +6,9 @@
import { IncomingHttpHeaders } from "http";
import { CloudEventV1 } from "..";
// reexport the HTTP protocol binding
// reexport the protocol bindings
export * from "./http";
export * from "./kafka";
/**
* Binding is an interface for transport protocols to implement,

View File

@ -0,0 +1,72 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
import { CloudEventV1, CONSTANTS, Headers } from "../..";
type KafkaHeaders = Readonly<{
ID: string;
TYPE: string;
SOURCE: string;
SPEC_VERSION: string;
TIME: string;
SUBJECT: string;
DATACONTENTTYPE: string;
DATASCHEMA: string;
[key: string]: string;
}>
/**
* The set of CloudEvent headers that may exist on a Kafka message
*/
export const KAFKA_CE_HEADERS: KafkaHeaders = Object.freeze({
/** corresponds to the CloudEvent#id */
ID: "ce_id",
/** corresponds to the CloudEvent#type */
TYPE: "ce_type",
/** corresponds to the CloudEvent#source */
SOURCE: "ce_source",
/** corresponds to the CloudEvent#specversion */
SPEC_VERSION: "ce_specversion",
/** corresponds to the CloudEvent#time */
TIME: "ce_time",
/** corresponds to the CloudEvent#subject */
SUBJECT: "ce_subject",
/** corresponds to the CloudEvent#datacontenttype */
DATACONTENTTYPE: "ce_datacontenttype",
/** corresponds to the CloudEvent#dataschema */
DATASCHEMA: "ce_dataschema",
} as const);
export const HEADER_MAP: { [key: string]: string } = {
[KAFKA_CE_HEADERS.ID]: "id",
[KAFKA_CE_HEADERS.TYPE]: "type",
[KAFKA_CE_HEADERS.SOURCE]: "source",
[KAFKA_CE_HEADERS.SPEC_VERSION]: "specversion",
[KAFKA_CE_HEADERS.TIME]: "time",
[KAFKA_CE_HEADERS.SUBJECT]: "subject",
[KAFKA_CE_HEADERS.DATACONTENTTYPE]: "datacontenttype",
[KAFKA_CE_HEADERS.DATASCHEMA]: "dataschema"
};
/**
* A conveninece function to convert a CloudEvent into headers
* @param {CloudEvent} event a CloudEvent object
* @returns {Headers} the CloudEvent attribute as Kafka headers
*/
export function headersFor<T>(event: CloudEventV1<T>): Headers {
const headers: Headers = {};
Object.getOwnPropertyNames(event).forEach((property) => {
// Ignore the 'data' property
// it becomes the Kafka message's 'value' field
if (property != CONSTANTS.CE_ATTRIBUTES.DATA && property != CONSTANTS.STRUCTURED_ATTRS_1.DATA_BASE64) {
// all CloudEvent property names get prefixed with 'ce_'
// https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#3231-property-names
headers[`ce_${property}`] = event[property] as string;
}
});
return headers;
}

270
src/message/kafka/index.ts Normal file
View File

@ -0,0 +1,270 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
import { CloudEvent, CloudEventV1, CONSTANTS, Mode, ValidationError } from "../..";
import { Message, Headers, Binding } from "..";
import { headersFor, HEADER_MAP, KAFKA_CE_HEADERS } from "./headers";
import { sanitize } from "../http/headers";
// Export the binding implementation and message interface
export {
Kafka,
KafkaMessage,
KafkaEvent
};
/**
* Bindings for Kafka transport
* @implements {@linkcode Binding}
*/
const Kafka: Binding = {
binary: toBinaryKafkaMessage,
structured: toStructuredKafkaMessage,
toEvent: deserializeKafkaMessage,
isEvent: isKafkaEvent,
};
type Key = string | Buffer;
/**
* Extends the base Message type to include
* Kafka-specific fields
*/
interface KafkaMessage<T = string> extends Message {
key: Key
value: T | string | Buffer | unknown
timestamp?: string
}
/**
* Extends the base CloudEventV1 interface to include a `partitionkey` field
* which is explicitly mapped to KafkaMessage#key
*/
interface KafkaEvent<T> extends CloudEventV1<T> {
/**
* Maps to KafkaMessage#key per
* https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#31-key-mapping
*/
partitionkey: Key
}
/**
* Serialize a CloudEvent for Kafka in binary mode
* @implements {Serializer}
* @see https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#32-binary-content-mode
*
* @param {KafkaEvent<T>} event The event to serialize
* @returns {KafkaMessage<T>} a KafkaMessage instance
*/
function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
// 3.2.1. Content Type
// For the binary mode, the header content-type property MUST be mapped directly
// to the CloudEvents datacontenttype attribute.
const headers: Headers = {
...{ [CONSTANTS.HEADER_CONTENT_TYPE]: event.datacontenttype },
...headersFor(event)
};
return {
headers,
key: event.partitionkey as Key,
value: event.data,
body: event.data,
timestamp: timestamp(event.time)
};
}
/**
* Serialize a CloudEvent for Kafka in structured mode
* @implements {Serializer}
* @see https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#33-structured-content-mode
*
* @param {CloudEvent<T>} event the CloudEvent to be serialized
* @returns {KafkaMessage<T>} a KafkaMessage instance
*/
function toStructuredKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
if ((event instanceof CloudEvent) && event.data_base64) {
// The event's data is binary - delete it
event = event.cloneWith({ data: undefined });
}
const value = event.toString();
return {
// All events may not have a partitionkey set, but if they do,
// use it for the KafkaMessage#key per
// https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#31-key-mapping
key: event.partitionkey as Key,
value,
headers: {
[CONSTANTS.HEADER_CONTENT_TYPE]: CONSTANTS.DEFAULT_CE_CONTENT_TYPE,
},
body: value,
timestamp: timestamp(event.time)
};
}
/**
* Converts a Message to a CloudEvent
* @implements {Deserializer}
*
* @param {Message} message the incoming message
* @return {KafkaEvent} A new {KafkaEvent} instance
*/
function deserializeKafkaMessage<T>(message: Message): CloudEvent<T> | CloudEvent<T>[] {
if (!isKafkaEvent(message)) {
throw new ValidationError("No CloudEvent detected");
}
const m = message as KafkaMessage<T>;
if (!m.value) {
throw new ValidationError("Value is null or undefined");
}
if (!m.headers) {
throw new ValidationError("Headers are null or undefined");
}
const cleanHeaders: Headers = sanitize(m.headers);
const mode: Mode = getMode(cleanHeaders);
switch (mode) {
case Mode.BINARY:
return parseBinary(m);
case Mode.STRUCTURED:
return parseStructured(m);
case Mode.BATCH:
return parseBatched(m);
default:
throw new ValidationError("Unknown Message mode");
}
}
/**
* Determine if a Message is a CloudEvent via Kafka headers
* @implements {Detector}
*
* @param {Message} message an incoming Message object
* @returns {boolean} true if this Message is a CloudEvent
*/
function isKafkaEvent(message: Message): boolean {
const headers = sanitize(message.headers);
return !!headers[KAFKA_CE_HEADERS.ID] || // A binary mode event
headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE) as boolean || // A structured mode event
headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_BATCH) as boolean; // A batch of events
}
/**
* Determines what content mode a Kafka message is in given the provided headers
* @param {Headers} headers the headers
* @returns {Mode} the content mode of the KafkaMessage
*/
function getMode(headers: Headers): Mode {
const contentType = headers[CONSTANTS.HEADER_CONTENT_TYPE];
if (contentType) {
if (contentType.startsWith(CONSTANTS.MIME_CE_BATCH)) {
return Mode.BATCH;
} else if (contentType.startsWith(CONSTANTS.MIME_CE)) {
return Mode.STRUCTURED;
}
}
return Mode.BINARY;
}
/**
* Parses a binary kafka CE message and returns a CloudEvent
* @param {KafkaMessage} message the message
* @returns {CloudEvent<T>} a CloudEvent<T>
*/
function parseBinary<T>(message: KafkaMessage<T>): CloudEvent<T> {
const eventObj: { [key: string ]: unknown } = {};
const headers = { ...message.headers };
eventObj.datacontenttype = headers[CONSTANTS.HEADER_CONTENT_TYPE];
for (const key in KAFKA_CE_HEADERS) {
const h = KAFKA_CE_HEADERS[key];
if (!!headers[h]) {
eventObj[HEADER_MAP[h]] = headers[h];
if (h === KAFKA_CE_HEADERS.TIME) {
eventObj.time = new Date(eventObj.time as string).toISOString();
}
delete headers[h];
}
}
// Any remaining headers are extension attributes
// TODO: The spec is unlear on whether these should
// be prefixed with 'ce_' as headers. We assume it is
for (const key in headers) {
if (key.startsWith("ce_")) {
eventObj[key.replace("ce_", "")] = headers[key];
}
}
return new CloudEvent<T>({
...eventObj,
data: extractBinaryData(message),
partitionkey: message.key,
}, false);
}
/**
* Parses a structured kafka CE message and returns a CloudEvent
* @param {KafkaMessage<T>} message the message
* @returns {CloudEvent<T>} a KafkaEvent<T>
*/
function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
// Although the format of a structured encoded event could be something
// other than JSON, e.g. XML, we currently only support JSON
// encoded structured events.
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_JSON)) {
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
}
const eventObj = JSON.parse(message.value as string);
eventObj.time = new Date(eventObj.time).toISOString();
return new CloudEvent({
...eventObj,
partitionkey: message.key,
}, false);
}
/**
* Parses a batch kafka CE message and returns a CloudEvent[]
* @param {KafkaMessage<T>} message the message
* @returns {CloudEvent<T>[]} an array of KafkaEvent<T>
*/
function parseBatched<T>(message: KafkaMessage<T>): CloudEvent<T>[] {
// Although the format of batch encoded events could be something
// other than JSON, e.g. XML, we currently only support JSON
// encoded structured events.
if (!message.headers[CONSTANTS.HEADER_CONTENT_TYPE]?.startsWith(CONSTANTS.MIME_CE_BATCH)) {
throw new ValidationError(`Unsupported event encoding ${message.headers[CONSTANTS.HEADER_CONTENT_TYPE]}`);
}
const events = JSON.parse(message.value as string) as Record<string, unknown>[];
return events.map((e) => new CloudEvent({ ...e, partitionkey: message.key }, false));
}
/**
* Gets the data from a binary kafka ce message as T
* @param {KafkaMessage} message a KafkaMessage
* @returns {string | undefined} the data in the message
*/
function extractBinaryData<T>(message: KafkaMessage<T>): T {
let data = message.value as T;
// If the event data is JSON, go ahead and parse it
const datacontenttype = message.headers[CONSTANTS.HEADER_CONTENT_TYPE] as string;
if (!!datacontenttype && datacontenttype.startsWith(CONSTANTS.MIME_JSON)) {
if (typeof message.value === "string") {
data = JSON.parse(message.value);
} else if (typeof message.value === "object" && Buffer.isBuffer(message.value)) {
data = JSON.parse(message.value.toString());
}
}
return data;
}
/**
* Converts a possible date string into a correctly formatted
* (for CloudEvents) ISO date string.
* @param {string | undefined} t a possible date string
* @returns {string | undefined} a properly formatted ISO date string or undefined
*/
function timestamp(t: string|undefined): string | undefined {
return !!t ? `${Date.parse(t)}` : undefined;
}

View File

@ -7,13 +7,39 @@
import { assert } from "chai";
import { Given, When, Then, World } from "@cucumber/cucumber";
import { Message, Headers, HTTP } from "../../src";
import { Message, Headers, HTTP, KafkaMessage, Kafka } from "../../src";
// eslint-disable-next-line @typescript-eslint/no-var-requires
const { HTTPParser } = require("http-parser-js");
const parser = new HTTPParser(HTTPParser.REQUEST);
Given("Kafka Protocol Binding is supported", function (this: World) {
return true;
});
Given("a Kafka message with payload:", function (request: string) {
// Create a KafkaMessage from the incoming HTTP request
const value = Buffer.from(request);
const message: KafkaMessage = {
key: "",
headers: {},
body: value,
value,
};
this.message = message;
return true;
});
Then("Kafka headers:", function (attributes: { rawTable: [] }) {
this.message.headers = tableToObject(attributes.rawTable);
});
When("parsed as Kafka message", function () {
this.cloudevent = Kafka.toEvent(this.message);
return true;
});
Given("HTTP Protocol Binding is supported", function (this: World) {
return true;
});

View File

@ -5,6 +5,7 @@
import { expect } from "chai";
import { CloudEvent, HTTP, Message } from "../../src";
import { Kafka, KafkaMessage } from "../../src/message";
const type = "org.cncf.cloudevents.example";
const source = "http://unit.test";
@ -39,3 +40,22 @@ describe("A batched CloudEvent message over HTTP", () => {
expect(ce.constructor.name).to.equal("CloudEvent");
});
});
describe("A batched CloudEvent message over Kafka", () => {
it("Can be created with a typed Message", () => {
const value = JSON.stringify(fixture);
const message: KafkaMessage = {
key: "123",
value,
headers: {
"content-type": "application/cloudevents-batch+json",
},
body: value,
};
const batch = Kafka.toEvent(message);
expect(batch.length).to.equal(10);
const ce = (batch as CloudEvent<any>[])[0];
expect(typeof ce).to.equal("object");
expect(ce.constructor.name).to.equal("CloudEvent");
});
});

View File

@ -22,11 +22,11 @@ describe("Emitter Singleton", () => {
fixture.emit(false);
});
let body: unknown = (<Message>(<unknown>msg)).body;
let body: unknown = (msg as Message).body;
if (typeof body === "string") {
body = JSON.parse(body);
}
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
assertStructured({ ...(<any>body), ...(msg as Message).headers });
});
it("emit a Node.js 'cloudevent' event as an EventEmitter with ensureDelivery", async () => {
@ -37,11 +37,11 @@ describe("Emitter Singleton", () => {
const emitter = emitterFor(fn, { binding: HTTP, mode: Mode.STRUCTURED });
Emitter.on("cloudevent", emitter);
await fixture.emit(true);
let body: any = (<Message>msg).body;
let body: any = (msg as Message).body;
if (typeof body === "string") {
body = JSON.parse(body);
}
assertStructured({ ...(<any>body), ...(<Message>(<unknown>msg)).headers });
assertStructured({ ...(<any>body), ...(msg as Message).headers });
});
it("emit a Node.js 'cloudevent' event as an EventEmitter with ensureDelivery Error", async () => {

View File

@ -0,0 +1,317 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
import path from "path";
import fs from "fs";
import { expect } from "chai";
import { CloudEvent, CONSTANTS, Version } from "../../src";
import { asBase64 } from "../../src/event/validation";
import { Message, Kafka, KafkaMessage, KafkaEvent } from "../../src/message";
import { KAFKA_CE_HEADERS } from "../../src/message/kafka/headers";
const key = "foo/bar";
const type = "org.cncf.cloudevents.example";
const source = "urn:event:from:myapi/resource/123";
const time = new Date().toISOString();
const subject = "subject.ext";
const dataschema = "http://cloudevents.io/schema.json";
const datacontenttype = "application/json";
const id = "b46cf653-d48a-4b90-8dfa-355c01061361";
interface Idata {
foo: string
}
const data: Idata = {
foo: "bar",
};
const ext1Name = "extension1";
const ext1Value = "foobar";
const ext2Name = "extension2";
const ext2Value = "acme";
// Binary data as base64
const dataBinary = Uint32Array.from(JSON.stringify(data), (c) => c.codePointAt(0) as number);
const data_base64 = asBase64(dataBinary);
// Since the above is a special case (string as binary), let's test
// with a real binary file one is likely to encounter in the wild
const imageData = new Uint32Array(fs.readFileSync(path.join(process.cwd(), "test", "integration", "ce.png")));
const image_base64 = asBase64(imageData);
const fixture = new CloudEvent({
specversion: Version.V1,
id,
type,
source,
datacontenttype,
subject,
time,
dataschema,
data,
[ext1Name]: ext1Value,
[ext2Name]: ext2Value,
partitionkey: key,
});
describe("Kafka transport", () => {
it("Handles events with no content-type and no datacontenttype", () => {
const value = "{Something[Not:valid}JSON";
const message: KafkaMessage<string> = {
key,
value,
headers: {
[KAFKA_CE_HEADERS.SOURCE]: "/test/kafka",
[KAFKA_CE_HEADERS.TYPE]: "test.kafka",
[KAFKA_CE_HEADERS.ID]: "1234",
},
body: undefined,
};
const event: CloudEvent = Kafka.toEvent(message) as CloudEvent;
expect(event.data).to.equal(value);
expect(event.datacontentype).to.equal(undefined);
});
it("Can detect invalid CloudEvent Messages", () => {
// Create a message that is not an actual event
const message: KafkaMessage<string> = {
key,
value: "Hello world!",
headers: {
"Content-type": "text/plain",
},
body: undefined
};
expect(Kafka.isEvent(message)).to.be.false;
});
it("Can detect valid CloudEvent Messages", () => {
// Now create a message that is an event
const message = Kafka.binary(
new CloudEvent<Idata>({
source: "/message-test",
type: "example",
data,
}),
);
expect(Kafka.isEvent(message)).to.be.true;
});
it("Handles CloudEvents with datacontenttype of text/plain", () => {
const message: Message<string> = Kafka.binary(
new CloudEvent({
source: "/test",
type: "example",
datacontenttype: "text/plain",
data: "Hello, friends!",
}),
);
const event = Kafka.toEvent(message) as CloudEvent<string>;
expect(event.validate()).to.be.true;
});
it("Respects extension attribute casing (even if against spec)", () => {
// Create a message that is an event
const message: KafkaMessage<string> = {
key,
body: undefined,
value: `{ "greeting": "hello" }`,
headers: {
[KAFKA_CE_HEADERS.ID]: "1234",
[KAFKA_CE_HEADERS.SOURCE]: "test",
[KAFKA_CE_HEADERS.TYPE]: "test.event",
"ce_LUNCH": "tacos",
},
};
expect(Kafka.isEvent(message)).to.be.true;
const event = Kafka.toEvent(message) as CloudEvent<string>;
expect(event.LUNCH).to.equal("tacos");
expect(function () {
event.validate();
}).to.throw("invalid attribute name: LUNCH");
});
it("Can detect CloudEvent binary Messages with weird versions", () => {
// Now create a message that is an event
const message: KafkaMessage<string> = {
key,
body: undefined,
value: `{ "greeting": "hello" }`,
headers: {
[KAFKA_CE_HEADERS.ID]: "1234",
[KAFKA_CE_HEADERS.SOURCE]: "test",
[KAFKA_CE_HEADERS.TYPE]: "test.event",
[KAFKA_CE_HEADERS.SPEC_VERSION]: "11.8",
},
};
expect(Kafka.isEvent(message)).to.be.true;
const event = Kafka.toEvent(message) as CloudEvent;
expect(event.specversion).to.equal("11.8");
expect(event.validate()).to.be.false;
});
it("Can detect CloudEvent structured Messages with weird versions", () => {
// Now create a message that is an event
const message: KafkaMessage<string> = {
key,
body: undefined,
value: `{ "source": "test", "type": "test.event", "specversion": "11.8"}`,
headers: {
[KAFKA_CE_HEADERS.ID]: "1234",
},
};
expect(Kafka.isEvent(message)).to.be.true;
expect(Kafka.toEvent(message)).not.to.throw;
});
// Allow for external systems to send bad events - do what we can
// to accept them
it("Does not throw an exception when converting an invalid Message to a CloudEvent", () => {
const message: KafkaMessage<string> = {
key,
body: undefined,
value: `"hello world"`,
headers: {
[CONSTANTS.HEADER_CONTENT_TYPE]: "application/json",
[KAFKA_CE_HEADERS.ID]: "1234",
[KAFKA_CE_HEADERS.TYPE]: "example.bad.event",
// no required ce_source header, thus an invalid event
},
};
const event = Kafka.toEvent(message) as CloudEvent;
expect(event).to.be.instanceOf(CloudEvent);
// ensure that we actually now have an invalid event
expect(event.validate).to.throw;
});
it("Does not allow an invalid CloudEvent to be converted to a Message", () => {
const badEvent = new CloudEvent(
{
source: "/example.source",
type: "", // type is required, empty string will throw with strict validation
},
false, // turn off strict validation
);
expect(() => {
Kafka.binary(badEvent);
}).to.throw;
expect(() => {
Kafka.structured(badEvent);
}).to.throw;
});
// https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#31-key-mapping
it("Maps `KafkaMessage#key` value to CloudEvent#partitionkey property", () => {
const message: KafkaMessage<string> = {
key,
body: undefined,
value: `{ "source": "test", "type": "test.event", "specversion": "11.8"}`,
headers: {
[KAFKA_CE_HEADERS.ID]: "1234",
},
};
const event = Kafka.toEvent(message) as KafkaEvent<string>;
expect(event.partitionkey).to.equal(key);
});
// https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#31-key-mapping
it("Maps CloudEvent#partitionkey value to a `key` in binary KafkaMessages", () => {
const event = new CloudEvent({
source,
type,
partitionkey: key,
});
const message = Kafka.binary(event) as KafkaMessage;
expect(message.key).to.equal(key);
});
it("Binary Messages can be created from a CloudEvent", () => {
const message: Message<Idata> = Kafka.binary(fixture);
expect(message.body).to.equal(data);
// validate all headers
expect(message.headers[CONSTANTS.HEADER_CONTENT_TYPE]).to.equal(datacontenttype);
expect(message.headers[KAFKA_CE_HEADERS.SPEC_VERSION]).to.equal(Version.V1);
expect(message.headers[KAFKA_CE_HEADERS.ID]).to.equal(id);
expect(message.headers[KAFKA_CE_HEADERS.TYPE]).to.equal(type);
expect(message.headers[KAFKA_CE_HEADERS.SOURCE]).to.equal(source);
expect(message.headers[KAFKA_CE_HEADERS.SUBJECT]).to.equal(subject);
expect(message.headers[KAFKA_CE_HEADERS.TIME]).to.equal(fixture.time);
expect(message.headers[KAFKA_CE_HEADERS.DATASCHEMA]).to.equal(dataschema);
expect(message.headers[`ce_${ext1Name}`]).to.equal(ext1Value);
expect(message.headers[`ce_${ext2Name}`]).to.equal(ext2Value);
});
it("Structured Messages can be created from a CloudEvent", () => {
const message: Message<Idata> = Kafka.structured(fixture);
expect(message.headers[CONSTANTS.HEADER_CONTENT_TYPE]).to.equal(CONSTANTS.DEFAULT_CE_CONTENT_TYPE);
// Parse the message body as JSON, then validate the attributes
const body = JSON.parse(message.body as string);
expect(body[CONSTANTS.CE_ATTRIBUTES.SPEC_VERSION]).to.equal(Version.V1);
expect(body[CONSTANTS.CE_ATTRIBUTES.ID]).to.equal(id);
expect(body[CONSTANTS.CE_ATTRIBUTES.TYPE]).to.equal(type);
expect(body[CONSTANTS.CE_ATTRIBUTES.SOURCE]).to.equal(source);
expect(body[CONSTANTS.CE_ATTRIBUTES.SUBJECT]).to.equal(subject);
expect(body[CONSTANTS.CE_ATTRIBUTES.TIME]).to.equal(fixture.time);
expect(body[CONSTANTS.STRUCTURED_ATTRS_1.DATA_SCHEMA]).to.equal(dataschema);
expect(body[ext1Name]).to.equal(ext1Value);
expect(body[ext2Name]).to.equal(ext2Value);
});
it("A CloudEvent can be converted from a binary Message", () => {
const message = Kafka.binary(fixture);
const event = Kafka.toEvent(message);
// The Kafka deserializer sets a partitionkey
expect(event).to.deep.equal({...fixture, partitionkey: (event as KafkaEvent<any>).partitionkey});
});
it("A CloudEvent can be converted from a binary Message", () => {
const message = Kafka.binary(fixture);
const event = Kafka.toEvent(message);
expect(event).to.deep.equal(fixture);
});
it("A CloudEvent can be converted from a structured Message", () => {
const message = Kafka.structured(fixture);
const event = Kafka.toEvent(message);
expect(event).to.deep.equal(fixture);
});
it("Converts binary data to base64 when serializing structured messages", () => {
const event = fixture.cloneWith({ data: imageData, datacontenttype: "image/png" });
expect(event.data).to.equal(imageData);
const message = Kafka.structured(event);
const messageBody = JSON.parse(message.body as string);
expect(messageBody.data_base64).to.equal(image_base64);
});
it.skip("Converts base64 encoded data to binary when deserializing structured messages", () => {
const message = Kafka.structured(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }));
const eventDeserialized = Kafka.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(imageData);
expect(eventDeserialized.data_base64).to.equal(image_base64);
});
it("Converts base64 encoded data to binary when deserializing binary messages", () => {
const message = Kafka.binary(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }));
const eventDeserialized = Kafka.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(imageData);
expect(eventDeserialized.data_base64).to.equal(image_base64);
});
it("Keeps binary data binary when serializing binary messages", () => {
const event = fixture.cloneWith({ data: dataBinary });
expect(event.data).to.equal(dataBinary);
const message = Kafka.binary(event);
expect(message.body).to.equal(dataBinary);
});
it("Does not parse binary data from binary messages with content type application/json", () => {
const message = Kafka.binary(fixture.cloneWith({ data: dataBinary }));
const eventDeserialized = Kafka.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(dataBinary);
expect(eventDeserialized.data_base64).to.equal(data_base64);
});
});

View File

@ -43,7 +43,7 @@ const image_base64 = asBase64(imageData);
describe("HTTP transport", () => {
it("Handles events with no content-type and no datacontenttype", () => {
const body = "{Something[Not:valid}JSON";
const message: Message = {
const message: Message<undefined> = {
body,
headers: {
"ce-source": "/test/type",
@ -58,7 +58,7 @@ describe("HTTP transport", () => {
it("Can detect invalid CloudEvent Messages", () => {
// Create a message that is not an actual event
const message: Message = {
const message: Message<undefined> = {
body: "Hello world!",
headers: {
"Content-type": "text/plain",
@ -88,7 +88,7 @@ describe("HTTP transport", () => {
data: "Hello, friends!",
}),
);
const event = HTTP.toEvent(message) as CloudEvent;
const event = HTTP.toEvent(message) as CloudEvent<string>;
expect(event.validate()).to.be.true;
});
@ -143,7 +143,7 @@ describe("HTTP transport", () => {
// Allow for external systems to send bad events - do what we can
// to accept them
it("Does not throw an exception when converting an invalid Message to a CloudEvent", () => {
const message: Message = {
const message: Message<undefined> = {
body: `"hello world"`,
headers: {
"content-type": "application/json",
@ -186,7 +186,7 @@ describe("HTTP transport", () => {
specversion: Version.V1,
data: { lunch: "tacos" },
});
const message: Message = {
const message: Message<undefined> = {
headers,
body,
};
@ -226,7 +226,7 @@ describe("HTTP transport", () => {
});
it("Structured Messages can be created from a CloudEvent", () => {
const message: Message = HTTP.structured(fixture);
const message: Message<string> = HTTP.structured(fixture);
expect(message.headers[CONSTANTS.HEADER_CONTENT_TYPE]).to.equal(CONSTANTS.DEFAULT_CE_CONTENT_TYPE);
// Parse the message body as JSON, then validate the attributes
const body = JSON.parse(message.body as string);
@ -263,21 +263,21 @@ describe("HTTP transport", () => {
it("Converts base64 encoded data to binary when deserializing structured messages", () => {
const message = HTTP.structured(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }));
const eventDeserialized = HTTP.toEvent(message) as CloudEvent;
const eventDeserialized = HTTP.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(imageData);
expect(eventDeserialized.data_base64).to.equal(image_base64);
});
it("Does not parse binary data from structured messages with content type application/json", () => {
const message = HTTP.structured(fixture.cloneWith({ data: dataBinary }));
const eventDeserialized = HTTP.toEvent(message) as CloudEvent;
const eventDeserialized = HTTP.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(dataBinary);
expect(eventDeserialized.data_base64).to.equal(data_base64);
});
it("Converts base64 encoded data to binary when deserializing binary messages", () => {
const message = HTTP.binary(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }));
const eventDeserialized = HTTP.toEvent(message) as CloudEvent;
const eventDeserialized = HTTP.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(imageData);
expect(eventDeserialized.data_base64).to.equal(image_base64);
});
@ -291,7 +291,7 @@ describe("HTTP transport", () => {
it("Does not parse binary data from binary messages with content type application/json", () => {
const message = HTTP.binary(fixture.cloneWith({ data: dataBinary }));
const eventDeserialized = HTTP.toEvent(message) as CloudEvent;
const eventDeserialized = HTTP.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(dataBinary);
expect(eventDeserialized.data_base64).to.equal(data_base64);
});
@ -368,14 +368,14 @@ describe("HTTP transport", () => {
// Creating an event with binary data automatically produces base64 encoded data
// which is then set as the 'data' attribute on the message body
const message = HTTP.structured(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }));
const eventDeserialized = HTTP.toEvent(message) as CloudEvent;
const eventDeserialized = HTTP.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(imageData);
expect(eventDeserialized.data_base64).to.equal(image_base64);
});
it("Converts base64 encoded data to binary when deserializing binary messages", () => {
const message = HTTP.binary(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }));
const eventDeserialized = HTTP.toEvent(message) as CloudEvent;
const eventDeserialized = HTTP.toEvent(message) as CloudEvent<Uint32Array>;
expect(eventDeserialized.data).to.deep.equal(imageData);
expect(eventDeserialized.data_base64).to.equal(image_base64);
});