From 133c38129f0ad9733b6763004616dccd24e9d9c5 Mon Sep 17 00:00:00 2001 From: Jon Skeet Date: Tue, 23 Mar 2021 12:31:58 +0000 Subject: [PATCH] Batch event handling This adds batch event handling to CloudEventFormatter, and implements it in both JSON formatters. Tests in the next commit. Handling in the various protocol bindings is not part of this commit. Signed-off-by: Jon Skeet --- .../AvroEventFormatter.cs | 6 + .../JsonEventFormatter.cs | 77 +++++++++++- .../JsonEventFormatter.cs | 117 ++++++++++++++---- .../CloudEventFormatter.cs | 54 ++++++++ .../Core/MimeUtilities.cs | 6 + 5 files changed, 231 insertions(+), 29 deletions(-) diff --git a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs index c521d02..6ffee29 100644 --- a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs +++ b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs @@ -67,6 +67,12 @@ namespace CloudNative.CloudEvents return DecodeStructuredModeMessage(new MemoryStream(data), contentType, extensionAttributes); } + public override IReadOnlyList DecodeBatchModeMessage(byte[] data, ContentType contentType, IEnumerable extensionAttributes) => + throw new NotSupportedException("The Avro event formatter does not support batch content mode"); + + public override byte[] EncodeBatchModeMessage(IEnumerable cloudEvent, out ContentType contentType) => + throw new NotSupportedException("The Avro event formatter does not support batch content mode"); + private CloudEvent DecodeGenericRecord(GenericRecord record, IEnumerable extensionAttributes) { if (!record.TryGetValue(AttributeName, out var attrObj)) diff --git a/src/CloudNative.CloudEvents.NewtonsoftJson/JsonEventFormatter.cs b/src/CloudNative.CloudEvents.NewtonsoftJson/JsonEventFormatter.cs index d01542c..7789f99 100644 --- a/src/CloudNative.CloudEvents.NewtonsoftJson/JsonEventFormatter.cs +++ b/src/CloudNative.CloudEvents.NewtonsoftJson/JsonEventFormatter.cs @@ -85,6 +85,9 @@ namespace CloudNative.CloudEvents.NewtonsoftJson private const string JsonMediaType = "application/json"; private const string MediaTypeSuffix = "+json"; + private static readonly string StructuredMediaType = CloudEvent.MediaType + MediaTypeSuffix; + private static readonly string BatchMediaType = MimeUtilities.BatchMediaType + MediaTypeSuffix; + /// /// The property name to use for base64-encoded binary data in a structured-mode message. /// @@ -137,6 +140,44 @@ namespace CloudNative.CloudEvents.NewtonsoftJson public override CloudEvent DecodeStructuredModeMessage(byte[] data, ContentType contentType, IEnumerable extensionAttributes) => DecodeStructuredModeMessage(new MemoryStream(data), contentType, extensionAttributes); + public override async Task> DecodeBatchModeMessageAsync(Stream data, ContentType contentType, IEnumerable extensionAttributes) + { + Validation.CheckNotNull(data, nameof(data)); + + var jsonReader = CreateJsonReader(data, MimeUtilities.GetEncoding(contentType)); + var array = await JArray.LoadAsync(jsonReader).ConfigureAwait(false); + return DecodeJArray(array, extensionAttributes, nameof(data)); + } + + public override IReadOnlyList DecodeBatchModeMessage(Stream data, ContentType contentType, IEnumerable extensionAttributes) + { + Validation.CheckNotNull(data, nameof(data)); + + var jsonReader = CreateJsonReader(data, MimeUtilities.GetEncoding(contentType)); + var array = JArray.Load(jsonReader); + return DecodeJArray(array, extensionAttributes, nameof(data)); + } + + public override IReadOnlyList DecodeBatchModeMessage(byte[] data, ContentType contentType, IEnumerable extensionAttributes) => + DecodeBatchModeMessage(new MemoryStream(data), contentType, extensionAttributes); + + private IReadOnlyList DecodeJArray(JArray jArray, IEnumerable extensionAttributes, string paramName) + { + List events = new List(jArray.Count); + foreach (var token in jArray) + { + if (token is JObject obj) + { + events.Add(DecodeJObject(obj, extensionAttributes)); + } + else + { + throw new ArgumentException($"Invalid array element index {events.Count} within batch; expected an object, but token type was '{token?.Type}'", paramName); + } + } + return events; + } + private CloudEvent DecodeJObject(JObject jObject, IEnumerable extensionAttributes = null) { if (!jObject.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionToken) @@ -307,15 +348,45 @@ namespace CloudNative.CloudEvents.NewtonsoftJson public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType) { - Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent)); + // The cloudEvent parameter will be validated in WriteCloudEventForBatchOrStructuredMode - contentType = new ContentType("application/cloudevents+json") + contentType = new ContentType(StructuredMediaType) { CharSet = Encoding.UTF8.WebName }; var stream = new MemoryStream(); var writer = new JsonTextWriter(new StreamWriter(stream)); + WriteCloudEventForBatchOrStructuredMode(writer, cloudEvent); + writer.Flush(); + return stream.ToArray(); + } + + public override byte[] EncodeBatchModeMessage(IEnumerable cloudEvents, out ContentType contentType) + { + Validation.CheckNotNull(cloudEvents, nameof(cloudEvents)); + + contentType = new ContentType(BatchMediaType) + { + CharSet = Encoding.UTF8.WebName + }; + + var stream = new MemoryStream(); + var writer = new JsonTextWriter(new StreamWriter(stream)); + writer.WriteStartArray(); + foreach (var cloudEvent in cloudEvents) + { + WriteCloudEventForBatchOrStructuredMode(writer, cloudEvent); + } + writer.WriteEndArray(); + writer.Flush(); + return stream.ToArray(); + } + + private void WriteCloudEventForBatchOrStructuredMode(JsonWriter writer, CloudEvent cloudEvent) + { + Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent)); + writer.WriteStartObject(); writer.WritePropertyName(CloudEventsSpecVersion.SpecVersionAttribute.Name); writer.WriteValue(cloudEvent.SpecVersion.VersionId); @@ -344,8 +415,6 @@ namespace CloudNative.CloudEvents.NewtonsoftJson EncodeStructuredModeData(cloudEvent, writer); } writer.WriteEndObject(); - writer.Flush(); - return stream.ToArray(); } /// diff --git a/src/CloudNative.CloudEvents.SystemTextJson/JsonEventFormatter.cs b/src/CloudNative.CloudEvents.SystemTextJson/JsonEventFormatter.cs index cea160c..6254c5f 100644 --- a/src/CloudNative.CloudEvents.SystemTextJson/JsonEventFormatter.cs +++ b/src/CloudNative.CloudEvents.SystemTextJson/JsonEventFormatter.cs @@ -70,6 +70,9 @@ namespace CloudNative.CloudEvents.SystemTextJson private const string JsonMediaType = "application/json"; private const string MediaTypeSuffix = "+json"; + private static readonly string StructuredMediaType = CloudEvent.MediaType + MediaTypeSuffix; + private static readonly string BatchMediaType = MimeUtilities.BatchMediaType + MediaTypeSuffix; + /// /// The property name to use for base64-encoded binary data in a structured-mode message. /// @@ -121,32 +124,72 @@ namespace CloudNative.CloudEvents.SystemTextJson private async Task DecodeStructuredModeMessageImpl(Stream data, ContentType contentType, IEnumerable extensionAttributes, bool async) { Validation.CheckNotNull(data, nameof(data)); + JsonDocument document = await ReadDocumentAsync(data, contentType, async).ConfigureAwait(false); + using (document) + { + return DecodeJsonElement(document.RootElement, extensionAttributes, nameof(data)); + } + } + public override Task> DecodeBatchModeMessageAsync(Stream data, ContentType contentType, IEnumerable extensionAttributes) => + DecodeBatchModeMessageImpl(data, contentType, extensionAttributes, true); + + public override IReadOnlyList DecodeBatchModeMessage(Stream data, ContentType contentType, IEnumerable extensionAttributes) => + DecodeBatchModeMessageImpl(data, contentType, extensionAttributes, false).GetAwaiter().GetResult(); + + public override IReadOnlyList DecodeBatchModeMessage(byte[] data, ContentType contentType, IEnumerable extensionAttributes) => + DecodeBatchModeMessageImpl(new MemoryStream(data), contentType, extensionAttributes, false).GetAwaiter().GetResult(); + + private async Task> DecodeBatchModeMessageImpl(Stream data, ContentType contentType, IEnumerable extensionAttributes, bool async) + { + Validation.CheckNotNull(data, nameof(data)); + var document = await ReadDocumentAsync(data, contentType, async).ConfigureAwait(false); + using (document) + { + var root = document.RootElement; + if (root.ValueKind != JsonValueKind.Array) + { + throw new ArgumentException($"Cannot decode JSON element of kind '{root.ValueKind}' as batch CloudEvent"); + } + // Avoiding LINQ to avoid extraneous allocations etc. + List events = new List(root.GetArrayLength()); + foreach (var element in root.EnumerateArray()) + { + events.Add(DecodeJsonElement(element, extensionAttributes, nameof(data))); + } + return events; + } + } + + private async Task ReadDocumentAsync(Stream data, ContentType contentType, bool async) + { var encoding = MimeUtilities.GetEncoding(contentType); - JsonDocument document; if (encoding is UTF8Encoding) { - document = async + return async ? await JsonDocument.ParseAsync(data, DocumentOptions).ConfigureAwait(false) : JsonDocument.Parse(data, DocumentOptions); } else { using var reader = new StreamReader(data, encoding); - string json = async + var json = async ? await reader.ReadToEndAsync().ConfigureAwait(false) : reader.ReadToEnd(); - document = JsonDocument.Parse(json, DocumentOptions); - } - using (document) - { - return DecodeJsonDocument(document, extensionAttributes); + return JsonDocument.Parse(json, DocumentOptions); } } - private CloudEvent DecodeJsonDocument(JsonDocument document, IEnumerable extensionAttributes = null) + // TODO: Override the other methods + + private CloudEvent DecodeJsonElement(JsonElement element, IEnumerable extensionAttributes, string paramName) { - if (!document.RootElement.TryGetProperty(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionProperty) + if (element.ValueKind != JsonValueKind.Object) + { + throw new ArgumentException($"Cannot decode JSON element of kind '{element.ValueKind}' as CloudEvent"); + } + + if (!element.TryGetProperty(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionProperty) || specVersionProperty.ValueKind != JsonValueKind.String) { throw new ArgumentException($"Structured mode content does not represent a CloudEvent"); @@ -155,16 +198,14 @@ namespace CloudNative.CloudEvents.SystemTextJson ?? throw new ArgumentException($"Unsupported CloudEvents spec version '{specVersionProperty.GetString()}'"); var cloudEvent = new CloudEvent(specVersion, extensionAttributes); - PopulateAttributesFromStructuredEvent(cloudEvent, document); - PopulateDataFromStructuredEvent(cloudEvent, document); - // "data" is always the parameter from the public method. It's annoying not to be able to use - // nameof here, but this will give the appropriate result. - return Validation.CheckCloudEventArgument(cloudEvent, "data"); + PopulateAttributesFromStructuredEvent(cloudEvent, element); + PopulateDataFromStructuredEvent(cloudEvent, element); + return Validation.CheckCloudEventArgument(cloudEvent, paramName); } - private void PopulateAttributesFromStructuredEvent(CloudEvent cloudEvent, JsonDocument document) + private void PopulateAttributesFromStructuredEvent(CloudEvent cloudEvent, JsonElement element) { - foreach (var jsonProperty in document.RootElement.EnumerateObject()) + foreach (var jsonProperty in element.EnumerateObject()) { var name = jsonProperty.Name; var value = jsonProperty.Value; @@ -237,11 +278,11 @@ namespace CloudNative.CloudEvents.SystemTextJson } } - private void PopulateDataFromStructuredEvent(CloudEvent cloudEvent, JsonDocument document) + private void PopulateDataFromStructuredEvent(CloudEvent cloudEvent, JsonElement element) { // Fetch data and data_base64 tokens, and treat null as missing. - document.RootElement.TryGetProperty(DataPropertyName, out var dataElement); - document.RootElement.TryGetProperty(DataBase64PropertyName, out var dataBase64Element); + element.TryGetProperty(DataPropertyName, out var dataElement); + element.TryGetProperty(DataBase64PropertyName, out var dataBase64Element); bool dataPresent = dataElement.ValueKind != JsonValueKind.Null && dataElement.ValueKind != JsonValueKind.Undefined; bool dataBase64Present = dataBase64Element.ValueKind != JsonValueKind.Null && dataBase64Element.ValueKind != JsonValueKind.Undefined; @@ -320,17 +361,45 @@ namespace CloudNative.CloudEvents.SystemTextJson ? dataElement.GetString() : (object) dataElement.Clone(); // Deliberately cast to object to provide the conditional operator expression type. - public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType) + public override byte[] EncodeBatchModeMessage(IEnumerable cloudEvents, out ContentType contentType) { - Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent)); + Validation.CheckNotNull(cloudEvents, nameof(cloudEvents)); - contentType = new ContentType("application/cloudevents+json") + contentType = new ContentType(BatchMediaType) { CharSet = Encoding.UTF8.WebName }; var stream = new MemoryStream(); var writer = new Utf8JsonWriter(stream); + writer.WriteStartArray(); + foreach (var cloudEvent in cloudEvents) + { + WriteCloudEventForBatchOrStructuredMode(writer, cloudEvent); + } + writer.WriteEndArray(); + writer.Flush(); + return stream.ToArray(); + } + + public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType) + { + contentType = new ContentType(StructuredMediaType) + { + CharSet = Encoding.UTF8.WebName + }; + + var stream = new MemoryStream(); + var writer = new Utf8JsonWriter(stream); + WriteCloudEventForBatchOrStructuredMode(writer, cloudEvent); + writer.Flush(); + return stream.ToArray(); + } + + private void WriteCloudEventForBatchOrStructuredMode(Utf8JsonWriter writer, CloudEvent cloudEvent) + { + Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent)); + writer.WriteStartObject(); writer.WritePropertyName(CloudEventsSpecVersion.SpecVersionAttribute.Name); writer.WriteStringValue(cloudEvent.SpecVersion.VersionId); @@ -360,8 +429,6 @@ namespace CloudNative.CloudEvents.SystemTextJson EncodeStructuredModeData(cloudEvent, writer); } writer.WriteEndObject(); - writer.Flush(); - return stream.ToArray(); } /// diff --git a/src/CloudNative.CloudEvents/CloudEventFormatter.cs b/src/CloudNative.CloudEvents/CloudEventFormatter.cs index 43dabdc..9cebe42 100644 --- a/src/CloudNative.CloudEvents/CloudEventFormatter.cs +++ b/src/CloudNative.CloudEvents/CloudEventFormatter.cs @@ -110,5 +110,59 @@ namespace CloudNative.CloudEvents /// event formatter. /// The binary-mode representation of the CloudEvent. public abstract byte[] EncodeBinaryModeEventData(CloudEvent cloudEvent); + + /// + /// Decodes a collection CloudEvents from a batch-mode message body, represented as a byte array. + /// + /// The data within the message body. Must not be null. + /// The content type of the message, or null if no content type is known. + /// Typically this is a content type with a media type with a prefix of "application/cloudevents-batch"; the additional + /// information such as the charset parameter may be needed in order to decode the data. + /// The extension attributes to use when populating the CloudEvent. May be null. + /// The collection of CloudEvents derived from the batched data. + public abstract IReadOnlyList DecodeBatchModeMessage(byte[] data, ContentType contentType, IEnumerable extensionAttributes); + + /// + /// Decodes a collection CloudEvents from a batch-mode message body, represented as a stream. The default implementation copies the + /// content of the stream into a byte array before passing it to + /// but this can be overridden by event formatters that can decode a stream more efficiently. + /// + /// The data within the message body. Must not be null. + /// The content type of the message, or null if no content type is known. + /// Typically this is a content type with a media type with a prefix of "application/cloudevents"; the additional + /// information such as the charset parameter may be needed in order to decode the data. + /// The extension attributes to use when populating the CloudEvent. May be null. + /// The collection of CloudEvents derived from the batched data. + public virtual IReadOnlyList DecodeBatchModeMessage(Stream data, ContentType contentType, IEnumerable extensionAttributes) + { + var bytes = BinaryDataUtilities.ToByteArray(data); + return DecodeBatchModeMessage(bytes, contentType, extensionAttributes); + } + + /// + /// Asynchronously decodes a collection CloudEvents from a batch-mode message body, represented as a stream. The default implementation asynchronously copies the + /// content of the stream into a byte array before passing it to + /// but this can be overridden by event formatters that can decode a stream more efficiently. + /// + /// The data within the message body. Must not be null. + /// The content type of the message, or null if no content type is known. + /// Typically this is a content type with a media type with a prefix of "application/cloudevents"; the additional + /// information such as the charset parameter may be needed in order to decode the data. + /// The extension attributes to use when populating the CloudEvent. May be null. + /// The collection of CloudEvents derived from the batched data. + public virtual async Task> DecodeBatchModeMessageAsync(Stream data, ContentType contentType, IEnumerable extensionAttributes) + { + var bytes = await BinaryDataUtilities.ToByteArrayAsync(data).ConfigureAwait(false); + return DecodeBatchModeMessage(bytes, contentType, extensionAttributes); + } + + /// + // Encodes a sequence of CloudEvents as the body of a message. + /// + /// The CloudEvents to encode. Must not be null. + /// On successful return, the content type of the structured-mode data. + /// Must not be null (on return). + /// The batch representation of the CloudEvent. + public abstract byte[] EncodeBatchModeMessage(IEnumerable cloudEvents, out ContentType contentType); } } \ No newline at end of file diff --git a/src/CloudNative.CloudEvents/Core/MimeUtilities.cs b/src/CloudNative.CloudEvents/Core/MimeUtilities.cs index 95ccc86..deaebe2 100644 --- a/src/CloudNative.CloudEvents/Core/MimeUtilities.cs +++ b/src/CloudNative.CloudEvents/Core/MimeUtilities.cs @@ -16,6 +16,12 @@ namespace CloudNative.CloudEvents.Core /// public static class MimeUtilities { + /// + /// The media type to use for batch mode. This is usually suffixed with a format-specific + /// type, e.g. "+json". + /// + public static string BatchMediaType { get; } = CloudEvent.MediaType + "-batch"; + // TODO: Should we return null, and force the caller to do the appropriate defaulting? /// /// Returns an encoding from a content type, defaulting to UTF-8.