Batch event handling

This adds batch event handling to CloudEventFormatter, and
implements it in both JSON formatters. Tests in the next commit.

Handling in the various protocol bindings is not part of this commit.

Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
Jon Skeet 2021-03-23 12:31:58 +00:00 committed by Jon Skeet
parent 216124c2cf
commit 133c38129f
5 changed files with 231 additions and 29 deletions

View File

@ -67,6 +67,12 @@ namespace CloudNative.CloudEvents
return DecodeStructuredModeMessage(new MemoryStream(data), contentType, extensionAttributes);
}
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(byte[] data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
throw new NotSupportedException("The Avro event formatter does not support batch content mode");
public override byte[] EncodeBatchModeMessage(IEnumerable<CloudEvent> cloudEvent, out ContentType contentType) =>
throw new NotSupportedException("The Avro event formatter does not support batch content mode");
private CloudEvent DecodeGenericRecord(GenericRecord record, IEnumerable<CloudEventAttribute> extensionAttributes)
{
if (!record.TryGetValue(AttributeName, out var attrObj))

View File

@ -85,6 +85,9 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
private const string JsonMediaType = "application/json";
private const string MediaTypeSuffix = "+json";
private static readonly string StructuredMediaType = CloudEvent.MediaType + MediaTypeSuffix;
private static readonly string BatchMediaType = MimeUtilities.BatchMediaType + MediaTypeSuffix;
/// <summary>
/// The property name to use for base64-encoded binary data in a structured-mode message.
/// </summary>
@ -137,6 +140,44 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
public override CloudEvent DecodeStructuredModeMessage(byte[] data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
DecodeStructuredModeMessage(new MemoryStream(data), contentType, extensionAttributes);
public override async Task<IReadOnlyList<CloudEvent>> DecodeBatchModeMessageAsync(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
{
Validation.CheckNotNull(data, nameof(data));
var jsonReader = CreateJsonReader(data, MimeUtilities.GetEncoding(contentType));
var array = await JArray.LoadAsync(jsonReader).ConfigureAwait(false);
return DecodeJArray(array, extensionAttributes, nameof(data));
}
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
{
Validation.CheckNotNull(data, nameof(data));
var jsonReader = CreateJsonReader(data, MimeUtilities.GetEncoding(contentType));
var array = JArray.Load(jsonReader);
return DecodeJArray(array, extensionAttributes, nameof(data));
}
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(byte[] data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
DecodeBatchModeMessage(new MemoryStream(data), contentType, extensionAttributes);
private IReadOnlyList<CloudEvent> DecodeJArray(JArray jArray, IEnumerable<CloudEventAttribute> extensionAttributes, string paramName)
{
List<CloudEvent> events = new List<CloudEvent>(jArray.Count);
foreach (var token in jArray)
{
if (token is JObject obj)
{
events.Add(DecodeJObject(obj, extensionAttributes));
}
else
{
throw new ArgumentException($"Invalid array element index {events.Count} within batch; expected an object, but token type was '{token?.Type}'", paramName);
}
}
return events;
}
private CloudEvent DecodeJObject(JObject jObject, IEnumerable<CloudEventAttribute> extensionAttributes = null)
{
if (!jObject.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionToken)
@ -307,15 +348,45 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
// The cloudEvent parameter will be validated in WriteCloudEventForBatchOrStructuredMode
contentType = new ContentType("application/cloudevents+json")
contentType = new ContentType(StructuredMediaType)
{
CharSet = Encoding.UTF8.WebName
};
var stream = new MemoryStream();
var writer = new JsonTextWriter(new StreamWriter(stream));
WriteCloudEventForBatchOrStructuredMode(writer, cloudEvent);
writer.Flush();
return stream.ToArray();
}
public override byte[] EncodeBatchModeMessage(IEnumerable<CloudEvent> cloudEvents, out ContentType contentType)
{
Validation.CheckNotNull(cloudEvents, nameof(cloudEvents));
contentType = new ContentType(BatchMediaType)
{
CharSet = Encoding.UTF8.WebName
};
var stream = new MemoryStream();
var writer = new JsonTextWriter(new StreamWriter(stream));
writer.WriteStartArray();
foreach (var cloudEvent in cloudEvents)
{
WriteCloudEventForBatchOrStructuredMode(writer, cloudEvent);
}
writer.WriteEndArray();
writer.Flush();
return stream.ToArray();
}
private void WriteCloudEventForBatchOrStructuredMode(JsonWriter writer, CloudEvent cloudEvent)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
writer.WriteStartObject();
writer.WritePropertyName(CloudEventsSpecVersion.SpecVersionAttribute.Name);
writer.WriteValue(cloudEvent.SpecVersion.VersionId);
@ -344,8 +415,6 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
EncodeStructuredModeData(cloudEvent, writer);
}
writer.WriteEndObject();
writer.Flush();
return stream.ToArray();
}
/// <summary>

View File

@ -70,6 +70,9 @@ namespace CloudNative.CloudEvents.SystemTextJson
private const string JsonMediaType = "application/json";
private const string MediaTypeSuffix = "+json";
private static readonly string StructuredMediaType = CloudEvent.MediaType + MediaTypeSuffix;
private static readonly string BatchMediaType = MimeUtilities.BatchMediaType + MediaTypeSuffix;
/// <summary>
/// The property name to use for base64-encoded binary data in a structured-mode message.
/// </summary>
@ -121,32 +124,72 @@ namespace CloudNative.CloudEvents.SystemTextJson
private async Task<CloudEvent> DecodeStructuredModeMessageImpl(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes, bool async)
{
Validation.CheckNotNull(data, nameof(data));
JsonDocument document = await ReadDocumentAsync(data, contentType, async).ConfigureAwait(false);
using (document)
{
return DecodeJsonElement(document.RootElement, extensionAttributes, nameof(data));
}
}
public override Task<IReadOnlyList<CloudEvent>> DecodeBatchModeMessageAsync(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
DecodeBatchModeMessageImpl(data, contentType, extensionAttributes, true);
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
DecodeBatchModeMessageImpl(data, contentType, extensionAttributes, false).GetAwaiter().GetResult();
public override IReadOnlyList<CloudEvent> DecodeBatchModeMessage(byte[] data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
DecodeBatchModeMessageImpl(new MemoryStream(data), contentType, extensionAttributes, false).GetAwaiter().GetResult();
private async Task<IReadOnlyList<CloudEvent>> DecodeBatchModeMessageImpl(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes, bool async)
{
Validation.CheckNotNull(data, nameof(data));
var document = await ReadDocumentAsync(data, contentType, async).ConfigureAwait(false);
using (document)
{
var root = document.RootElement;
if (root.ValueKind != JsonValueKind.Array)
{
throw new ArgumentException($"Cannot decode JSON element of kind '{root.ValueKind}' as batch CloudEvent");
}
// Avoiding LINQ to avoid extraneous allocations etc.
List<CloudEvent> events = new List<CloudEvent>(root.GetArrayLength());
foreach (var element in root.EnumerateArray())
{
events.Add(DecodeJsonElement(element, extensionAttributes, nameof(data)));
}
return events;
}
}
private async Task<JsonDocument> ReadDocumentAsync(Stream data, ContentType contentType, bool async)
{
var encoding = MimeUtilities.GetEncoding(contentType);
JsonDocument document;
if (encoding is UTF8Encoding)
{
document = async
return async
? await JsonDocument.ParseAsync(data, DocumentOptions).ConfigureAwait(false)
: JsonDocument.Parse(data, DocumentOptions);
}
else
{
using var reader = new StreamReader(data, encoding);
string json = async
var json = async
? await reader.ReadToEndAsync().ConfigureAwait(false)
: reader.ReadToEnd();
document = JsonDocument.Parse(json, DocumentOptions);
}
using (document)
{
return DecodeJsonDocument(document, extensionAttributes);
return JsonDocument.Parse(json, DocumentOptions);
}
}
private CloudEvent DecodeJsonDocument(JsonDocument document, IEnumerable<CloudEventAttribute> extensionAttributes = null)
// TODO: Override the other methods
private CloudEvent DecodeJsonElement(JsonElement element, IEnumerable<CloudEventAttribute> extensionAttributes, string paramName)
{
if (!document.RootElement.TryGetProperty(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionProperty)
if (element.ValueKind != JsonValueKind.Object)
{
throw new ArgumentException($"Cannot decode JSON element of kind '{element.ValueKind}' as CloudEvent");
}
if (!element.TryGetProperty(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionProperty)
|| specVersionProperty.ValueKind != JsonValueKind.String)
{
throw new ArgumentException($"Structured mode content does not represent a CloudEvent");
@ -155,16 +198,14 @@ namespace CloudNative.CloudEvents.SystemTextJson
?? throw new ArgumentException($"Unsupported CloudEvents spec version '{specVersionProperty.GetString()}'");
var cloudEvent = new CloudEvent(specVersion, extensionAttributes);
PopulateAttributesFromStructuredEvent(cloudEvent, document);
PopulateDataFromStructuredEvent(cloudEvent, document);
// "data" is always the parameter from the public method. It's annoying not to be able to use
// nameof here, but this will give the appropriate result.
return Validation.CheckCloudEventArgument(cloudEvent, "data");
PopulateAttributesFromStructuredEvent(cloudEvent, element);
PopulateDataFromStructuredEvent(cloudEvent, element);
return Validation.CheckCloudEventArgument(cloudEvent, paramName);
}
private void PopulateAttributesFromStructuredEvent(CloudEvent cloudEvent, JsonDocument document)
private void PopulateAttributesFromStructuredEvent(CloudEvent cloudEvent, JsonElement element)
{
foreach (var jsonProperty in document.RootElement.EnumerateObject())
foreach (var jsonProperty in element.EnumerateObject())
{
var name = jsonProperty.Name;
var value = jsonProperty.Value;
@ -237,11 +278,11 @@ namespace CloudNative.CloudEvents.SystemTextJson
}
}
private void PopulateDataFromStructuredEvent(CloudEvent cloudEvent, JsonDocument document)
private void PopulateDataFromStructuredEvent(CloudEvent cloudEvent, JsonElement element)
{
// Fetch data and data_base64 tokens, and treat null as missing.
document.RootElement.TryGetProperty(DataPropertyName, out var dataElement);
document.RootElement.TryGetProperty(DataBase64PropertyName, out var dataBase64Element);
element.TryGetProperty(DataPropertyName, out var dataElement);
element.TryGetProperty(DataBase64PropertyName, out var dataBase64Element);
bool dataPresent = dataElement.ValueKind != JsonValueKind.Null && dataElement.ValueKind != JsonValueKind.Undefined;
bool dataBase64Present = dataBase64Element.ValueKind != JsonValueKind.Null && dataBase64Element.ValueKind != JsonValueKind.Undefined;
@ -320,17 +361,45 @@ namespace CloudNative.CloudEvents.SystemTextJson
? dataElement.GetString()
: (object) dataElement.Clone(); // Deliberately cast to object to provide the conditional operator expression type.
public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
public override byte[] EncodeBatchModeMessage(IEnumerable<CloudEvent> cloudEvents, out ContentType contentType)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
Validation.CheckNotNull(cloudEvents, nameof(cloudEvents));
contentType = new ContentType("application/cloudevents+json")
contentType = new ContentType(BatchMediaType)
{
CharSet = Encoding.UTF8.WebName
};
var stream = new MemoryStream();
var writer = new Utf8JsonWriter(stream);
writer.WriteStartArray();
foreach (var cloudEvent in cloudEvents)
{
WriteCloudEventForBatchOrStructuredMode(writer, cloudEvent);
}
writer.WriteEndArray();
writer.Flush();
return stream.ToArray();
}
public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
{
contentType = new ContentType(StructuredMediaType)
{
CharSet = Encoding.UTF8.WebName
};
var stream = new MemoryStream();
var writer = new Utf8JsonWriter(stream);
WriteCloudEventForBatchOrStructuredMode(writer, cloudEvent);
writer.Flush();
return stream.ToArray();
}
private void WriteCloudEventForBatchOrStructuredMode(Utf8JsonWriter writer, CloudEvent cloudEvent)
{
Validation.CheckCloudEventArgument(cloudEvent, nameof(cloudEvent));
writer.WriteStartObject();
writer.WritePropertyName(CloudEventsSpecVersion.SpecVersionAttribute.Name);
writer.WriteStringValue(cloudEvent.SpecVersion.VersionId);
@ -360,8 +429,6 @@ namespace CloudNative.CloudEvents.SystemTextJson
EncodeStructuredModeData(cloudEvent, writer);
}
writer.WriteEndObject();
writer.Flush();
return stream.ToArray();
}
/// <summary>

View File

@ -110,5 +110,59 @@ namespace CloudNative.CloudEvents
/// event formatter.</exception>
/// <returns>The binary-mode representation of the CloudEvent.</returns>
public abstract byte[] EncodeBinaryModeEventData(CloudEvent cloudEvent);
/// <summary>
/// Decodes a collection CloudEvents from a batch-mode message body, represented as a byte array.
/// </summary>
/// <param name="data">The data within the message body. Must not be null.</param>
/// <param name="contentType">The content type of the message, or null if no content type is known.
/// Typically this is a content type with a media type with a prefix of "application/cloudevents-batch"; the additional
/// information such as the charset parameter may be needed in order to decode the data.</param>
/// <param name="extensions">The extension attributes to use when populating the CloudEvent. May be null.</param>
/// <returns>The collection of CloudEvents derived from the batched data.</returns>
public abstract IReadOnlyList<CloudEvent> DecodeBatchModeMessage(byte[] data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes);
/// <summary>
/// Decodes a collection CloudEvents from a batch-mode message body, represented as a stream. The default implementation copies the
/// content of the stream into a byte array before passing it to <see cref="DecodeBatchModeMessage(byte[], ContentType, IEnumerable{CloudEventAttribute})"/>
/// but this can be overridden by event formatters that can decode a stream more efficiently.
/// </summary>
/// <param name="data">The data within the message body. Must not be null.</param>
/// <param name="contentType">The content type of the message, or null if no content type is known.
/// Typically this is a content type with a media type with a prefix of "application/cloudevents"; the additional
/// information such as the charset parameter may be needed in order to decode the data.</param>
/// <param name="extensions">The extension attributes to use when populating the CloudEvent. May be null.</param>
/// <returns>The collection of CloudEvents derived from the batched data.</returns>
public virtual IReadOnlyList<CloudEvent> DecodeBatchModeMessage(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
{
var bytes = BinaryDataUtilities.ToByteArray(data);
return DecodeBatchModeMessage(bytes, contentType, extensionAttributes);
}
/// <summary>
/// Asynchronously decodes a collection CloudEvents from a batch-mode message body, represented as a stream. The default implementation asynchronously copies the
/// content of the stream into a byte array before passing it to <see cref="DecodeBatchModeMessage(byte[], ContentType, IEnumerable{CloudEventAttribute})"/>
/// but this can be overridden by event formatters that can decode a stream more efficiently.
/// </summary>
/// <param name="data">The data within the message body. Must not be null.</param>
/// <param name="contentType">The content type of the message, or null if no content type is known.
/// Typically this is a content type with a media type with a prefix of "application/cloudevents"; the additional
/// information such as the charset parameter may be needed in order to decode the data.</param>
/// <param name="extensions">The extension attributes to use when populating the CloudEvent. May be null.</param>
/// <returns>The collection of CloudEvents derived from the batched data.</returns>
public virtual async Task<IReadOnlyList<CloudEvent>> DecodeBatchModeMessageAsync(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
{
var bytes = await BinaryDataUtilities.ToByteArrayAsync(data).ConfigureAwait(false);
return DecodeBatchModeMessage(bytes, contentType, extensionAttributes);
}
/// <summary>
// Encodes a sequence of CloudEvents as the body of a message.
/// </summary>
/// <param name="cloudEvents">The CloudEvents to encode. Must not be null.</param>
/// <param name="contentType">On successful return, the content type of the structured-mode data.
/// Must not be null (on return).</param>
/// <returns>The batch representation of the CloudEvent.</returns>
public abstract byte[] EncodeBatchModeMessage(IEnumerable<CloudEvent> cloudEvents, out ContentType contentType);
}
}

View File

@ -16,6 +16,12 @@ namespace CloudNative.CloudEvents.Core
/// </summary>
public static class MimeUtilities
{
/// <summary>
/// The media type to use for batch mode. This is usually suffixed with a format-specific
/// type, e.g. "+json".
/// </summary>
public static string BatchMediaType { get; } = CloudEvent.MediaType + "-batch";
// TODO: Should we return null, and force the caller to do the appropriate defaulting?
/// <summary>
/// Returns an encoding from a content type, defaulting to UTF-8.