feat: add MQTT transport messaging (#459)
Add MQTT as a `Message` format. This commit adds `MQTT` to the supported transport protocols by adding a `Binding` and the `MQTTMessage<T>` type, extending the base `Message` type, adding the MQTT fields for `payload`, `PUBLISH` and `User Properties`. The `payload` field directly maps to `Message#body`, while `User Properties` roughly maps to `Message#headers`, even though the properties here are not formatted with a `ce-` prefix like other transport protocols. This is per the spec. See: https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md. Signed-off-by: Lance Ball <lball@redhat.com>
This commit is contained in:
parent
5d1f744f50
commit
591d133f31
|
@ -163,7 +163,7 @@ There you will find Express.js, TypeScript and Websocket examples.
|
||||||
| AMQP Protocol Binding | :x: | :x: |
|
| AMQP Protocol Binding | :x: | :x: |
|
||||||
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
|
| HTTP Protocol Binding | :heavy_check_mark: | :heavy_check_mark: |
|
||||||
| Kafka Protocol Binding | :x: | :heavy_check_mark: |
|
| Kafka Protocol Binding | :x: | :heavy_check_mark: |
|
||||||
| MQTT Protocol Binding | :x: | :x: |
|
| MQTT Protocol Binding | :heavy_check_mark: | :x: |
|
||||||
| NATS Protocol Binding | :x: | :x: |
|
| NATS Protocol Binding | :x: | :x: |
|
||||||
|
|
||||||
---
|
---
|
||||||
|
@ -176,6 +176,9 @@ There you will find Express.js, TypeScript and Websocket examples.
|
||||||
| Kafka Binary | :heavy_check_mark: | :heavy_check_mark: |
|
| Kafka Binary | :heavy_check_mark: | :heavy_check_mark: |
|
||||||
| Kafka Structured | :heavy_check_mark: | :heavy_check_mark: |
|
| Kafka Structured | :heavy_check_mark: | :heavy_check_mark: |
|
||||||
| Kafka Batch | :heavy_check_mark: | :heavy_check_mark:
|
| Kafka Batch | :heavy_check_mark: | :heavy_check_mark:
|
||||||
|
| MQTT Binary | :heavy_check_mark: | :heavy_check_mark: |
|
||||||
|
| MQTT Structured | :heavy_check_mark: | :heavy_check_mark: |
|
||||||
|
|
||||||
## Community
|
## Community
|
||||||
|
|
||||||
- There are bi-weekly calls immediately following the [Serverless/CloudEvents
|
- There are bi-weekly calls immediately following the [Serverless/CloudEvents
|
||||||
|
|
|
@ -9,7 +9,7 @@ import { CloudEventV1, CloudEventV1Attributes } from "./event/interfaces";
|
||||||
|
|
||||||
import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
|
import { Options, TransportFunction, EmitterFunction, emitterFor, Emitter } from "./transport/emitter";
|
||||||
import {
|
import {
|
||||||
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message,
|
Headers, Mode, Binding, HTTP, Kafka, KafkaEvent, KafkaMessage, Message, MQTT, MQTTMessage, MQTTMessageFactory,
|
||||||
Serializer, Deserializer } from "./message";
|
Serializer, Deserializer } from "./message";
|
||||||
|
|
||||||
import CONSTANTS from "./constants";
|
import CONSTANTS from "./constants";
|
||||||
|
@ -32,6 +32,9 @@ export {
|
||||||
Kafka,
|
Kafka,
|
||||||
KafkaEvent,
|
KafkaEvent,
|
||||||
KafkaMessage,
|
KafkaMessage,
|
||||||
|
MQTT,
|
||||||
|
MQTTMessage,
|
||||||
|
MQTTMessageFactory,
|
||||||
// From transport
|
// From transport
|
||||||
TransportFunction,
|
TransportFunction,
|
||||||
EmitterFunction,
|
EmitterFunction,
|
||||||
|
|
|
@ -9,6 +9,7 @@ import { CloudEventV1 } from "..";
|
||||||
// reexport the protocol bindings
|
// reexport the protocol bindings
|
||||||
export * from "./http";
|
export * from "./http";
|
||||||
export * from "./kafka";
|
export * from "./kafka";
|
||||||
|
export * from "./mqtt";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Binding is an interface for transport protocols to implement,
|
* Binding is an interface for transport protocols to implement,
|
||||||
|
|
|
@ -0,0 +1,148 @@
|
||||||
|
/*
|
||||||
|
Copyright 2021 The CloudEvents Authors
|
||||||
|
SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { Binding, Deserializer, CloudEvent, CloudEventV1, CONSTANTS, Message, ValidationError, Headers } from "../..";
|
||||||
|
|
||||||
|
export {
|
||||||
|
MQTT,
|
||||||
|
MQTTMessage,
|
||||||
|
MQTTMessageFactory
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extends the base {@linkcode Message} interface to include MQTT attributes, some of which
|
||||||
|
* are aliases of the {Message} attributes.
|
||||||
|
*/
|
||||||
|
interface MQTTMessage<T> extends Message<T> {
|
||||||
|
/**
|
||||||
|
* Identifies this message as a PUBLISH packet. MQTTMessages created with
|
||||||
|
* the `binary` and `structured` Serializers will contain a "Content Type"
|
||||||
|
* property in the PUBLISH record.
|
||||||
|
* @see https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md#3-mqtt-publish-message-mapping
|
||||||
|
*/
|
||||||
|
PUBLISH: Record<string, string | undefined> | undefined
|
||||||
|
/**
|
||||||
|
* Alias of {Message#body}
|
||||||
|
*/
|
||||||
|
payload: T | undefined,
|
||||||
|
/**
|
||||||
|
* Alias of {Message#headers}
|
||||||
|
*/
|
||||||
|
"User Properties": Headers | undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Binding for MQTT transport support
|
||||||
|
* @implements @linkcode Binding
|
||||||
|
*/
|
||||||
|
const MQTT: Binding = {
|
||||||
|
binary,
|
||||||
|
structured,
|
||||||
|
toEvent: toEvent as Deserializer,
|
||||||
|
isEvent
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a CloudEvent into an MQTTMessage<T> with the event's data as the message payload
|
||||||
|
* @param {CloudEventV1} event a CloudEvent
|
||||||
|
* @returns {MQTTMessage<T>} the event serialized as an MQTTMessage<T> with binary encoding
|
||||||
|
* @implements {Serializer}
|
||||||
|
*/
|
||||||
|
function binary<T>(event: CloudEventV1<T>): MQTTMessage<T> {
|
||||||
|
let properties;
|
||||||
|
if (event instanceof CloudEvent) {
|
||||||
|
properties = event.toJSON();
|
||||||
|
} else {
|
||||||
|
properties = event;
|
||||||
|
}
|
||||||
|
const body = properties.data as T;
|
||||||
|
delete properties.data;
|
||||||
|
|
||||||
|
return MQTTMessageFactory(event.datacontenttype as string, properties, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a CloudEvent into an MQTTMessage<T> with the event as the message payload
|
||||||
|
* @param {CloudEventV1} event a CloudEvent
|
||||||
|
* @returns {MQTTMessage<T>} the event serialized as an MQTTMessage<T> with structured encoding
|
||||||
|
* @implements {Serializer}
|
||||||
|
*/
|
||||||
|
function structured<T>(event: CloudEventV1<T>): MQTTMessage<T> {
|
||||||
|
let body;
|
||||||
|
if (event instanceof CloudEvent) {
|
||||||
|
body = event.toJSON();
|
||||||
|
} else {
|
||||||
|
body = event;
|
||||||
|
}
|
||||||
|
return MQTTMessageFactory(CONSTANTS.DEFAULT_CE_CONTENT_TYPE, {}, body) as MQTTMessage<T>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper function to create an MQTTMessage<T> object, with "User Properties" as an alias
|
||||||
|
* for "headers" and "payload" an alias for body, and a "PUBLISH" record with a "Content Type"
|
||||||
|
* property.
|
||||||
|
* @param {string} contentType the "Content Type" attribute on PUBLISH
|
||||||
|
* @param {Record<string, unknown>} headers the headers and "User Properties"
|
||||||
|
* @param {T} body the message body/payload
|
||||||
|
* @returns {MQTTMessage<T>} a message initialized with the provided attributes
|
||||||
|
*/
|
||||||
|
function MQTTMessageFactory<T>(contentType: string, headers: Record<string, unknown>, body: T): MQTTMessage<T> {
|
||||||
|
return {
|
||||||
|
PUBLISH: {
|
||||||
|
"Content Type": contentType
|
||||||
|
},
|
||||||
|
body,
|
||||||
|
get payload() {
|
||||||
|
return this.body as T;
|
||||||
|
},
|
||||||
|
headers: headers as Headers,
|
||||||
|
get "User Properties"() {
|
||||||
|
return this.headers as any;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts an MQTTMessage<T> into a CloudEvent
|
||||||
|
* @param {Message<T>} message the message to deserialize
|
||||||
|
* @param {boolean} strict determines if a ValidationError will be thrown on bad input - defaults to false
|
||||||
|
* @returns {CloudEventV1<T>} an event
|
||||||
|
* @implements {Deserializer}
|
||||||
|
*/
|
||||||
|
function toEvent<T>(message: Message<T>, strict = false): CloudEventV1<T> | CloudEventV1<T>[] {
|
||||||
|
if (strict && !isEvent(message)) {
|
||||||
|
throw new ValidationError("No CloudEvent detected");
|
||||||
|
}
|
||||||
|
if (isStructuredMessage(message as MQTTMessage<T>)) {
|
||||||
|
const evt = (typeof message.body === "string") ? JSON.parse(message.body): message.body;
|
||||||
|
return new CloudEvent({
|
||||||
|
...evt as CloudEventV1<T>
|
||||||
|
}, false);
|
||||||
|
} else {
|
||||||
|
return new CloudEvent<T>({
|
||||||
|
...message.headers,
|
||||||
|
data: message.body as T,
|
||||||
|
}, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine if the message is a CloudEvent
|
||||||
|
* @param {Message<T>} message an MQTTMessage
|
||||||
|
* @returns {boolean} true if the message contains an event
|
||||||
|
*/
|
||||||
|
function isEvent<T>(message: Message<T>): boolean {
|
||||||
|
return isBinaryMessage(message) || isStructuredMessage(message as MQTTMessage<T>);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isBinaryMessage<T>(message: Message<T>): boolean {
|
||||||
|
return (!!message.headers.id && !!message.headers.source
|
||||||
|
&& !! message.headers.type && !!message.headers.specversion);
|
||||||
|
}
|
||||||
|
|
||||||
|
function isStructuredMessage<T>(message: MQTTMessage<T>): boolean {
|
||||||
|
if (!message) { return false; }
|
||||||
|
return (message.PUBLISH && message?.PUBLISH["Content Type"]?.startsWith(CONSTANTS.MIME_CE_JSON)) || false;
|
||||||
|
}
|
|
@ -0,0 +1,309 @@
|
||||||
|
/*
|
||||||
|
Copyright 2021 The CloudEvents Authors
|
||||||
|
SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
import path from "path";
|
||||||
|
import fs from "fs";
|
||||||
|
|
||||||
|
import { expect } from "chai";
|
||||||
|
import { CloudEvent, CONSTANTS, Version, Headers } from "../../src";
|
||||||
|
import { asBase64 } from "../../src/event/validation";
|
||||||
|
import { Message, MQTT, MQTTMessage } from "../../src/message";
|
||||||
|
|
||||||
|
const type = "org.cncf.cloudevents.example";
|
||||||
|
const source = "urn:event:from:myapi/resource/123";
|
||||||
|
const time = new Date().toISOString();
|
||||||
|
const subject = "subject.ext";
|
||||||
|
const dataschema = "http://cloudevents.io/schema.json";
|
||||||
|
const datacontenttype = "application/json";
|
||||||
|
const id = "b46cf653-d48a-4b90-8dfa-355c01061361";
|
||||||
|
|
||||||
|
interface Idata {
|
||||||
|
foo: string
|
||||||
|
}
|
||||||
|
const data: Idata = {
|
||||||
|
foo: "bar",
|
||||||
|
};
|
||||||
|
|
||||||
|
const ext1Name = "extension1";
|
||||||
|
const ext1Value = "foobar";
|
||||||
|
const ext2Name = "extension2";
|
||||||
|
const ext2Value = "acme";
|
||||||
|
|
||||||
|
// Binary data as base64
|
||||||
|
const dataBinary = Uint32Array.from(JSON.stringify(data), (c) => c.codePointAt(0) as number);
|
||||||
|
const data_base64 = asBase64(dataBinary);
|
||||||
|
|
||||||
|
// Since the above is a special case (string as binary), let's test
|
||||||
|
// with a real binary file one is likely to encounter in the wild
|
||||||
|
const imageData = new Uint32Array(fs.readFileSync(path.join(process.cwd(), "test", "integration", "ce.png")));
|
||||||
|
const image_base64 = asBase64(imageData);
|
||||||
|
|
||||||
|
const PUBLISH = {"Content Type": "application/json; charset=utf-8"};
|
||||||
|
|
||||||
|
const fixture = new CloudEvent({
|
||||||
|
specversion: Version.V1,
|
||||||
|
id,
|
||||||
|
type,
|
||||||
|
source,
|
||||||
|
datacontenttype,
|
||||||
|
subject,
|
||||||
|
time,
|
||||||
|
dataschema,
|
||||||
|
data,
|
||||||
|
[ext1Name]: ext1Value,
|
||||||
|
[ext2Name]: ext2Value,
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("MQTT transport", () => {
|
||||||
|
it("Handles events with no content-type and no datacontenttype", () => {
|
||||||
|
const payload = "{Something[Not:valid}JSON";
|
||||||
|
const userProperties = fixture.toJSON() as Headers;
|
||||||
|
const message: MQTTMessage<string> = {
|
||||||
|
PUBLISH: undefined, // no Content Type applied
|
||||||
|
payload,
|
||||||
|
"User Properties": userProperties,
|
||||||
|
headers: userProperties,
|
||||||
|
body: payload,
|
||||||
|
};
|
||||||
|
const event = MQTT.toEvent(message) as CloudEvent<undefined>;
|
||||||
|
expect(event.data).to.equal(payload);
|
||||||
|
expect(event.datacontentype).to.equal(undefined);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Can detect invalid CloudEvent Messages", () => {
|
||||||
|
// Create a message that is not an actual event
|
||||||
|
const message: MQTTMessage<string> = {
|
||||||
|
payload: "Hello world!",
|
||||||
|
PUBLISH: {
|
||||||
|
"Content type": "text/plain",
|
||||||
|
},
|
||||||
|
"User Properties": {},
|
||||||
|
headers: {},
|
||||||
|
body: undefined
|
||||||
|
};
|
||||||
|
expect(MQTT.isEvent(message)).to.be.false;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Can detect valid CloudEvent Messages", () => {
|
||||||
|
// Now create a message that is an event
|
||||||
|
const message = MQTT.binary(
|
||||||
|
new CloudEvent<Idata>({
|
||||||
|
source: "/message-test",
|
||||||
|
type: "example",
|
||||||
|
data,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
expect(MQTT.isEvent(message)).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Handles CloudEvents with datacontenttype of text/plain", () => {
|
||||||
|
const message: Message<string> = MQTT.binary(
|
||||||
|
new CloudEvent({
|
||||||
|
source: "/test",
|
||||||
|
type: "example",
|
||||||
|
datacontenttype: "text/plain",
|
||||||
|
data: "Hello, friends!",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
const event = MQTT.toEvent(message) as CloudEvent<string>;
|
||||||
|
expect(event.data).to.equal(message.body);
|
||||||
|
expect(event.validate()).to.be.true;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Respects extension attribute casing (even if against spec)", () => {
|
||||||
|
// Create a message that is an event
|
||||||
|
const body = `{ "greeting": "hello" }`;
|
||||||
|
const headers = {
|
||||||
|
id: "1234",
|
||||||
|
source: "test",
|
||||||
|
type: "test.event",
|
||||||
|
specversion: "1.0",
|
||||||
|
LUNCH: "tacos",
|
||||||
|
};
|
||||||
|
const message: MQTTMessage<string> = {
|
||||||
|
body,
|
||||||
|
payload: body,
|
||||||
|
PUBLISH,
|
||||||
|
"User Properties": headers,
|
||||||
|
headers
|
||||||
|
};
|
||||||
|
expect(MQTT.isEvent(message)).to.be.true;
|
||||||
|
const event = MQTT.toEvent(message) as CloudEvent<string>;
|
||||||
|
expect(event.LUNCH).to.equal("tacos");
|
||||||
|
expect(function () {
|
||||||
|
event.validate();
|
||||||
|
}).to.throw("invalid attribute name: LUNCH");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Can detect CloudEvent binary Messages with weird versions", () => {
|
||||||
|
// Now create a message that is an event
|
||||||
|
const body = `{ "greeting": "hello" }`;
|
||||||
|
const headers = {
|
||||||
|
id: "1234",
|
||||||
|
source: "test",
|
||||||
|
type: "test.event",
|
||||||
|
specversion: "11.8",
|
||||||
|
};
|
||||||
|
const message: MQTTMessage<string> = {
|
||||||
|
body,
|
||||||
|
payload: body,
|
||||||
|
PUBLISH,
|
||||||
|
headers,
|
||||||
|
"User Properties": headers,
|
||||||
|
};
|
||||||
|
expect(MQTT.isEvent(message)).to.be.true;
|
||||||
|
const event = MQTT.toEvent(message) as CloudEvent;
|
||||||
|
expect(event.specversion).to.equal("11.8");
|
||||||
|
expect(event.validate()).to.be.false;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Can detect CloudEvent structured Messages with weird versions", () => {
|
||||||
|
// Now create a message that is an event
|
||||||
|
const body = `{ "id": "123", "source": "test", "type": "test.event", "specversion": "11.8"}`;
|
||||||
|
const message: MQTTMessage<string> = {
|
||||||
|
body,
|
||||||
|
payload: body,
|
||||||
|
headers: {},
|
||||||
|
PUBLISH: {"Content Type": CONSTANTS.MIME_CE_JSON},
|
||||||
|
"User Properties": {}
|
||||||
|
};
|
||||||
|
expect(MQTT.isEvent(message)).to.be.true;
|
||||||
|
expect(MQTT.toEvent(message)).not.to.throw;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Allow for external systems to send bad events - do what we can
|
||||||
|
// to accept them
|
||||||
|
it("Does not throw an exception when converting an invalid Message to a CloudEvent", () => {
|
||||||
|
const body = `"hello world"`;
|
||||||
|
const headers = {
|
||||||
|
id: "1234",
|
||||||
|
type: "example.bad.event",
|
||||||
|
// no required source, thus an invalid event
|
||||||
|
};
|
||||||
|
const message: MQTTMessage<string> = {
|
||||||
|
body,
|
||||||
|
payload: body,
|
||||||
|
PUBLISH,
|
||||||
|
headers,
|
||||||
|
"User Properties": headers,
|
||||||
|
};
|
||||||
|
const event = MQTT.toEvent(message) as CloudEvent;
|
||||||
|
expect(event).to.be.instanceOf(CloudEvent);
|
||||||
|
// ensure that we actually now have an invalid event
|
||||||
|
expect(event.validate).to.throw;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Does not allow an invalid CloudEvent to be converted to a Message", () => {
|
||||||
|
const badEvent = new CloudEvent(
|
||||||
|
{
|
||||||
|
source: "/example.source",
|
||||||
|
type: "", // type is required, empty string will throw with strict validation
|
||||||
|
},
|
||||||
|
false, // turn off strict validation
|
||||||
|
);
|
||||||
|
expect(() => {
|
||||||
|
MQTT.binary(badEvent);
|
||||||
|
}).to.throw;
|
||||||
|
expect(() => {
|
||||||
|
MQTT.structured(badEvent);
|
||||||
|
}).to.throw;
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Binary Messages can be created from a CloudEvent", () => {
|
||||||
|
const message: Message<Idata> = MQTT.binary(fixture);
|
||||||
|
expect(message.body).to.equal(data);
|
||||||
|
// validate all headers
|
||||||
|
expect(message.headers.datacontenttype).to.equal(datacontenttype);
|
||||||
|
expect(message.headers.specversion).to.equal(Version.V1);
|
||||||
|
expect(message.headers.id).to.equal(id);
|
||||||
|
expect(message.headers.type).to.equal(type);
|
||||||
|
expect(message.headers.source).to.equal(source);
|
||||||
|
expect(message.headers.subject).to.equal(subject);
|
||||||
|
expect(message.headers.time).to.equal(fixture.time);
|
||||||
|
expect(message.headers.dataschema).to.equal(dataschema);
|
||||||
|
expect(message.headers[ext1Name]).to.equal(ext1Value);
|
||||||
|
expect(message.headers[ext2Name]).to.equal(ext2Value);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Sets User Properties on binary messages", () => {
|
||||||
|
const message: MQTTMessage<Idata> = MQTT.binary(fixture) as MQTTMessage<Idata>;
|
||||||
|
expect(message.body).to.equal(data);
|
||||||
|
// validate all headers
|
||||||
|
expect(message["User Properties"]?.datacontenttype).to.equal(datacontenttype);
|
||||||
|
expect(message["User Properties"]?.specversion).to.equal(Version.V1);
|
||||||
|
expect(message["User Properties"]?.id).to.equal(id);
|
||||||
|
expect(message["User Properties"]?.type).to.equal(type);
|
||||||
|
expect(message["User Properties"]?.source).to.equal(source);
|
||||||
|
expect(message["User Properties"]?.subject).to.equal(subject);
|
||||||
|
expect(message["User Properties"]?.time).to.equal(fixture.time);
|
||||||
|
expect(message["User Properties"]?.dataschema).to.equal(dataschema);
|
||||||
|
expect(message["User Properties"]?.[ext1Name]).to.equal(ext1Value);
|
||||||
|
expect(message["User Properties"]?.[ext2Name]).to.equal(ext2Value);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Structured Messages can be created from a CloudEvent", () => {
|
||||||
|
const message = MQTT.structured(fixture) as MQTTMessage<string>;
|
||||||
|
expect(message.PUBLISH?.["Content Type"]).to.equal(CONSTANTS.DEFAULT_CE_CONTENT_TYPE);
|
||||||
|
expect(message.body).to.deep.equal(message.payload);
|
||||||
|
expect(message.payload).to.deep.equal(fixture.toJSON());
|
||||||
|
const body = message.body as Record<string, string>;
|
||||||
|
expect(body[CONSTANTS.CE_ATTRIBUTES.SPEC_VERSION]).to.equal(Version.V1);
|
||||||
|
expect(body[CONSTANTS.CE_ATTRIBUTES.ID]).to.equal(id);
|
||||||
|
expect(body[CONSTANTS.CE_ATTRIBUTES.TYPE]).to.equal(type);
|
||||||
|
expect(body[CONSTANTS.CE_ATTRIBUTES.SOURCE]).to.equal(source);
|
||||||
|
expect(body[CONSTANTS.CE_ATTRIBUTES.SUBJECT]).to.equal(subject);
|
||||||
|
expect(body[CONSTANTS.CE_ATTRIBUTES.TIME]).to.equal(fixture.time);
|
||||||
|
expect(body[CONSTANTS.STRUCTURED_ATTRS_1.DATA_SCHEMA]).to.equal(dataschema);
|
||||||
|
expect(body[ext1Name]).to.equal(ext1Value);
|
||||||
|
expect(body[ext2Name]).to.equal(ext2Value);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("A CloudEvent can be converted from a binary Message", () => {
|
||||||
|
const message = MQTT.binary(fixture);
|
||||||
|
const event = MQTT.toEvent(message);
|
||||||
|
expect(event).to.deep.equal(fixture);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("A CloudEvent can be converted from a structured Message", () => {
|
||||||
|
const message = MQTT.structured(fixture);
|
||||||
|
const event = MQTT.toEvent(message);
|
||||||
|
expect(event).to.deep.equal(fixture);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Converts binary data to base64 when serializing structured messages", () => {
|
||||||
|
const event = fixture.cloneWith({ data: imageData, datacontenttype: "image/png" });
|
||||||
|
expect(event.data).to.equal(imageData);
|
||||||
|
const message = MQTT.structured(event);
|
||||||
|
expect((message.body as CloudEvent).data_base64).to.equal(image_base64);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Converts base64 encoded data to binary when deserializing structured messages", () => {
|
||||||
|
const message = MQTT.structured(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }));
|
||||||
|
const eventDeserialized = MQTT.toEvent(message) as CloudEvent<Uint32Array>;
|
||||||
|
expect(eventDeserialized.data).to.deep.equal(imageData);
|
||||||
|
expect(eventDeserialized.data_base64).to.equal(image_base64);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Converts base64 encoded data to binary when deserializing binary messages", () => {
|
||||||
|
const message = MQTT.binary(fixture.cloneWith({ data: imageData, datacontenttype: "image/png" }));
|
||||||
|
const eventDeserialized = MQTT.toEvent(message) as CloudEvent<Uint32Array>;
|
||||||
|
expect(eventDeserialized.data).to.deep.equal(imageData);
|
||||||
|
expect(eventDeserialized.data_base64).to.equal(image_base64);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Keeps binary data binary when serializing binary messages", () => {
|
||||||
|
const event = fixture.cloneWith({ data: dataBinary });
|
||||||
|
expect(event.data).to.equal(dataBinary);
|
||||||
|
const message = MQTT.binary(event);
|
||||||
|
expect(message.body).to.equal(dataBinary);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Does not parse binary data from binary messages with content type application/json", () => {
|
||||||
|
const message = MQTT.binary(fixture.cloneWith({ data: dataBinary }));
|
||||||
|
const eventDeserialized = MQTT.toEvent(message) as CloudEvent<Uint32Array>;
|
||||||
|
expect(eventDeserialized.data).to.deep.equal(dataBinary);
|
||||||
|
expect(eventDeserialized.data_base64).to.equal(data_base64);
|
||||||
|
});
|
||||||
|
});
|
Loading…
Reference in New Issue