diff --git a/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs b/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs
index da2b024..30878de 100644
--- a/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs
+++ b/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs
@@ -6,6 +6,7 @@ using Amqp;
using Amqp.Types;
using System;
using System.IO;
+using System.Net.Mime;
namespace CloudNative.CloudEvents.Amqp
{
@@ -16,16 +17,16 @@ namespace CloudNative.CloudEvents.Amqp
internal const string SpecVersionAmqpHeader = AmqpHeaderPrefix + "specversion";
public static bool IsCloudEvent(this Message message) =>
- (message.Properties.ContentType is Symbol contentType && contentType.ToString().StartsWith(CloudEvent.MediaType)) ||
+ HasCloudEventsContentType(message, out _) ||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader);
public static CloudEvent ToCloudEvent(this Message message,
CloudEventFormatter formatter,
params CloudEventAttribute[] extensionAttributes)
{
- if (HasCloudEventsContentType(message))
+ if (HasCloudEventsContentType(message, out var contentType))
{
- return formatter.DecodeStructuredEvent(new MemoryStream((byte[])message.Body), extensionAttributes);
+ return formatter.DecodeStructuredModeMessage(new MemoryStream((byte[])message.Body), new ContentType(contentType), extensionAttributes);
}
else
{
@@ -88,7 +89,10 @@ namespace CloudNative.CloudEvents.Amqp
}
// TODO: Check that it really is meant to be case-sensitive. (Original code was inconsistent.)
- private static bool HasCloudEventsContentType(Message message) =>
- message.Properties.ContentType is Symbol contentType && contentType.ToString().StartsWith(CloudEvent.MediaType);
+ private static bool HasCloudEventsContentType(Message message, out string contentType)
+ {
+ contentType = (message.Properties.ContentType as Symbol)?.ToString();
+ return contentType?.StartsWith(CloudEvent.MediaType) == true;
+ }
}
}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents.Amqp/AmqpCloudEventMessage.cs b/src/CloudNative.CloudEvents.Amqp/AmqpCloudEventMessage.cs
index cc0c7ec..ff128ca 100644
--- a/src/CloudNative.CloudEvents.Amqp/AmqpCloudEventMessage.cs
+++ b/src/CloudNative.CloudEvents.Amqp/AmqpCloudEventMessage.cs
@@ -22,7 +22,7 @@ namespace CloudNative.CloudEvents.Amqp
{
BodySection = new Data
{
- Binary = formatter.EncodeStructuredEvent(cloudEvent, out var contentType)
+ Binary = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType)
};
Properties = new Properties { ContentType = contentType.MediaType };
ApplicationProperties = new ApplicationProperties();
diff --git a/src/CloudNative.CloudEvents.AspNetCore/HttpRequestExtension.cs b/src/CloudNative.CloudEvents.AspNetCore/HttpRequestExtension.cs
index 0c6feee..53539f2 100644
--- a/src/CloudNative.CloudEvents.AspNetCore/HttpRequestExtension.cs
+++ b/src/CloudNative.CloudEvents.AspNetCore/HttpRequestExtension.cs
@@ -7,6 +7,7 @@ using Microsoft.AspNetCore.Http;
using System;
using System.IO;
using System.Linq;
+using System.Net.Mime;
using System.Threading.Tasks;
namespace CloudNative.CloudEvents
@@ -14,13 +15,13 @@ namespace CloudNative.CloudEvents
public static class HttpRequestExtension
{
///
- /// Converts this HTTP request into a CloudEvent object, with the given extensions,
- /// overriding the formatter.
+ /// Converts this HTTP request into a CloudEvent object.
///
- /// HTTP request
- /// The event formatter to use to process the request body.
- /// List of extension instances
- /// A CloudEvent instance or 'null' if the request message doesn't hold a CloudEvent
+ /// The HTTP request to decode. Must not be null.
+ /// The event formatter to use to process the request body. Must not be null.
+ /// The extension attributes to use when populating the CloudEvent. May be null.
+ /// The decoded CloudEvent.
+ /// The request does not contain a CloudEvent.
public static async ValueTask ReadCloudEventAsync(this HttpRequest httpRequest,
CloudEventFormatter formatter,
params CloudEventAttribute[] extensionAttributes)
@@ -28,7 +29,7 @@ namespace CloudNative.CloudEvents
if (HasCloudEventsContentType(httpRequest))
{
// TODO: Handle formatter being null
- return await formatter.DecodeStructuredEventAsync(httpRequest.Body, extensionAttributes).ConfigureAwait(false);
+ return await formatter.DecodeStructuredModeMessageAsync(httpRequest.Body, MimeUtilities.CreateContentTypeOrNull(httpRequest.ContentType), extensionAttributes).ConfigureAwait(false);
}
else
{
@@ -59,13 +60,11 @@ namespace CloudNative.CloudEvents
cloudEvent.DataContentType = httpRequest.ContentType;
if (httpRequest.Body is Stream body)
{
- // TODO: This is a bit ugly.
+ // TODO: This is a bit ugly. We have code in BinaryDataUtilities to handle this, but
+ // we'd rather not expose it...
var memoryStream = new MemoryStream();
await body.CopyToAsync(memoryStream).ConfigureAwait(false);
- if (memoryStream.Length != 0)
- {
- cloudEvent.Data = formatter.DecodeData(memoryStream.ToArray(), cloudEvent.DataContentType);
- }
+ formatter.DecodeBinaryModeEventData(memoryStream.ToArray(), cloudEvent);
}
return cloudEvent;
}
diff --git a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs
index 6362559..c2dbc32 100644
--- a/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs
+++ b/src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs
@@ -13,10 +13,27 @@ using System.Net.Mime;
namespace CloudNative.CloudEvents
{
///
- /// Formatter that implements the Avro Event Format
+ /// Formatter that implements the Avro Event Format.
///
+ ///
+ ///
+ /// This event formatter currently only supports structured-mode messages.
+ ///
+ ///
+ /// When encoding a CloudEvent, the data must be serializable as described in the
+ /// CloudEvents Avro Event
+ /// Format specification.
+ ///
+ ///
+ /// When decoding a CloudEvent, the property is populated directly from the
+ /// Avro record, so the value will have the natural Avro deserialization type for that data (which may
+ /// not be exactly the same as the type that was serialized).
+ ///
+ ///
public class AvroEventFormatter : CloudEventFormatter
{
+ private const string MediaTypeSuffix = "+avro";
+ private const string AttributeName = "attribute";
private const string DataName = "data";
private static readonly RecordSchema avroSchema;
private static readonly DefaultReader avroReader;
@@ -24,42 +41,46 @@ namespace CloudNative.CloudEvents
static AvroEventFormatter()
{
- // we're going to confidently assume that the embedded schema works. If not, type initialization
- // will fail and that's okay since the type is useless without the proper schema
+ // We're going to confidently assume that the embedded schema works. If not, type initialization
+ // will fail and that's okay since the type is useless without the proper schema.
using (var sr = new StreamReader(typeof(AvroEventFormatter).Assembly.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json")))
{
- avroSchema = (RecordSchema)RecordSchema.Parse(sr.ReadToEnd());
+ avroSchema = (RecordSchema) Schema.Parse(sr.ReadToEnd());
}
avroReader = new DefaultReader(avroSchema, avroSchema);
avroWriter = new DefaultWriter(avroSchema);
}
- public const string MediaTypeSuffix = "+avro";
- public override CloudEvent DecodeStructuredEvent(Stream data, IEnumerable extensionAttributes)
+ public override CloudEvent DecodeStructuredModeMessage(Stream data, ContentType contentType, IEnumerable extensionAttributes)
{
var decoder = new BinaryDecoder(data);
var rawEvent = avroReader.Read(null, decoder);
return DecodeGenericRecord(rawEvent, extensionAttributes);
}
- public override CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable extensionAttributes) =>
- DecodeStructuredEvent(new MemoryStream(data), extensionAttributes);
+ public override CloudEvent DecodeStructuredModeMessage(byte[] data, ContentType contentType, IEnumerable extensionAttributes) =>
+ DecodeStructuredModeMessage(new MemoryStream(data), contentType, extensionAttributes);
private CloudEvent DecodeGenericRecord(GenericRecord record, IEnumerable extensionAttributes)
{
- if (!record.TryGetValue("attribute", out var attrObj))
+ if (!record.TryGetValue(AttributeName, out var attrObj))
{
- return null;
+ throw new ArgumentException($"Record has no '{AttributeName}' field");
}
IDictionary recordAttributes = (IDictionary)attrObj;
- CloudEventsSpecVersion specVersion = CloudEventsSpecVersion.Default;
- if (recordAttributes.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var versionId) &&
- versionId is string versionIdString)
+ if (!recordAttributes.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var versionId) ||
+ !(versionId is string versionIdString))
{
- specVersion = CloudEventsSpecVersion.FromVersionId(versionIdString);
+ throw new ArgumentException("Specification version attribute is missing");
}
- var cloudEvent = new CloudEvent(specVersion, extensionAttributes);
+ CloudEventsSpecVersion version = CloudEventsSpecVersion.FromVersionId(versionIdString);
+ if (version is null)
+ {
+ throw new ArgumentException($"Unsupported CloudEvents spec version '{versionIdString}'");
+ }
+
+ var cloudEvent = new CloudEvent(version, extensionAttributes);
cloudEvent.Data = record.TryGetValue(DataName, out var data) ? data : null;
foreach (var keyValuePair in recordAttributes)
@@ -78,6 +99,7 @@ namespace CloudNative.CloudEvents
// The Avro schema allows the value to be a Boolean, integer, string or bytes.
// Timestamps and URIs are represented as strings, so we just use SetAttributeFromString to handle those.
+ // TODO: This does mean that any extensions of these types must have been registered beforehand.
if (value is bool || value is int || value is byte[])
{
cloudEvent[key] = value;
@@ -95,12 +117,13 @@ namespace CloudNative.CloudEvents
return cloudEvent;
}
- public override byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType)
+ public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
{
- contentType = new ContentType(CloudEvent.MediaType+AvroEventFormatter.MediaTypeSuffix);
+ contentType = new ContentType(CloudEvent.MediaType + MediaTypeSuffix);
+ // We expect the Avro encoded to detect data types that can't be represented in the schema.
GenericRecord record = new GenericRecord(avroSchema);
- record.Add(DataName, SerializeData(cloudEvent.Data));
+ record.Add(DataName, cloudEvent.Data);
var recordAttributes = new Dictionary();
recordAttributes[CloudEventsSpecVersion.SpecVersionAttribute.Name] = cloudEvent.SpecVersion.VersionId;
@@ -114,33 +137,18 @@ namespace CloudNative.CloudEvents
: attribute.Format(value);
recordAttributes[attribute.Name] = avroValue;
}
- record.Add("attribute", recordAttributes);
+ record.Add(AttributeName, recordAttributes);
MemoryStream memStream = new MemoryStream();
BinaryEncoder encoder = new BinaryEncoder(memStream);
avroWriter.Write(record, encoder);
- return new Span(memStream.GetBuffer(), 0, (int)memStream.Length).ToArray();
- }
-
- ///
- /// Convert data into a suitable format for inclusion in an Avro record.
- /// TODO: Asynchronous version of this...
- ///
- private static object SerializeData(object data)
- {
- if (data is Stream stream)
- {
- var ms = new MemoryStream();
- stream.CopyTo(ms);
- return ms.ToArray();
- }
- return data;
+ return memStream.ToArray();
}
// TODO: Validate that this is correct...
- public override byte[] EncodeData(object value) =>
+ public override byte[] EncodeBinaryModeEventData(CloudEvent cloudEvent) =>
throw new NotSupportedException("The Avro event formatter does not support binary content mode");
- public override object DecodeData(byte[] value, string contentType) =>
+ public override void DecodeBinaryModeEventData(byte[] value, CloudEvent cloudEvent) =>
throw new NotSupportedException("The Avro event formatter does not support binary content mode");
}
}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs b/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs
index a312db9..599058c 100644
--- a/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs
+++ b/src/CloudNative.CloudEvents.Kafka/KafkaClientExtensions.cs
@@ -7,6 +7,7 @@ using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Net.Mime;
using System.Text;
namespace CloudNative.CloudEvents.Kafka
@@ -38,7 +39,7 @@ namespace CloudNative.CloudEvents.Kafka
// Structured mode
if (contentType?.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase) == true)
{
- cloudEvent = eventFormatter.DecodeStructuredEvent(message.Value, extensionAttributes);
+ cloudEvent = eventFormatter.DecodeStructuredModeMessage(message.Value, new ContentType(contentType), extensionAttributes);
}
else
{
diff --git a/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs b/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs
index bf6af3d..abaffff 100644
--- a/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs
+++ b/src/CloudNative.CloudEvents.Kafka/KafkaCloudEventMessage.cs
@@ -34,7 +34,7 @@ namespace CloudNative.CloudEvents.Kafka
if (contentMode == ContentMode.Structured)
{
- Value = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
+ Value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
Headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentType.MediaType));
}
else
diff --git a/src/CloudNative.CloudEvents.Mqtt/MqttClientExtensions.cs b/src/CloudNative.CloudEvents.Mqtt/MqttClientExtensions.cs
index 185be71..b986353 100644
--- a/src/CloudNative.CloudEvents.Mqtt/MqttClientExtensions.cs
+++ b/src/CloudNative.CloudEvents.Mqtt/MqttClientExtensions.cs
@@ -11,7 +11,8 @@ namespace CloudNative.CloudEvents.Mqtt
public static CloudEvent ToCloudEvent(this MqttApplicationMessage message,
CloudEventFormatter eventFormatter, params CloudEventAttribute[] extensionAttributes)
{
- return eventFormatter.DecodeStructuredEvent(message.Payload, extensionAttributes);
+ // TODO: Determine if there's a sensible content type we should apply.
+ return eventFormatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes);
}
}
}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents.Mqtt/MqttCloudEventMessage.cs b/src/CloudNative.CloudEvents.Mqtt/MqttCloudEventMessage.cs
index cabed99..781660b 100644
--- a/src/CloudNative.CloudEvents.Mqtt/MqttCloudEventMessage.cs
+++ b/src/CloudNative.CloudEvents.Mqtt/MqttCloudEventMessage.cs
@@ -11,7 +11,7 @@ namespace CloudNative.CloudEvents.Mqtt
{
public MqttCloudEventMessage(CloudEvent cloudEvent, CloudEventFormatter formatter)
{
- this.Payload = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
+ this.Payload = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
}
}
}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents.NewtonsoftJson/JsonEventFormatter.cs b/src/CloudNative.CloudEvents.NewtonsoftJson/JsonEventFormatter.cs
index 2d85a18..2b9723b 100644
--- a/src/CloudNative.CloudEvents.NewtonsoftJson/JsonEventFormatter.cs
+++ b/src/CloudNative.CloudEvents.NewtonsoftJson/JsonEventFormatter.cs
@@ -14,77 +14,133 @@ using System.Threading.Tasks;
namespace CloudNative.CloudEvents.NewtonsoftJson
{
///
- /// Formatter that implements the JSON Event Format
+ /// Formatter that implements the JSON Event Format.
///
+ ///
+ ///
+ /// When encoding CloudEvent data, the behavior depends on the data content type of the CloudEvent
+ /// and the type of the property value, following the rules below.
+ ///
+ ///
+ /// -
+ /// If the data value is null, the content is empty for a binary mode message, and neither the "data"
+ /// nor "data_base64" property is populated in a structured mode message.
+ ///
+ /// -
+ /// If the data content type is absent or has a media type of "application/json", the data is encoded as JSON.
+ /// If the data is already a , that is serialized directly as JSON. Otherwise, the data
+ /// is converted using the passed into the constructor, or a
+ /// default serializer.
+ ///
+ /// -
+ /// Otherwise, if the data content type has a media type beginning with "text/" and the data value is a string,
+ /// the data is serialized as a string.
+ ///
+ /// -
+ /// Otherwise, if the data value is a byte array, it is serialized either directly as binary data
+ /// (for binary mode messages) or as base64 data (for structured mode messages).
+ ///
+ /// -
+ /// Otherwise, the encoding operation fails.
+ ///
+ ///
+ ///
+ /// When decoding CloudEvent data, the following rules are used:
+ ///
+ ///
+ /// In a structured mode message, any data is either binary data within the "data_base64" property value,
+ /// or is a JSON token as the "data" property value. Binary data is represented as a byte array.
+ /// A JSON token is decoded as a string if is just a string value and the data content type is specified
+ /// and has a media type beginning with "text/". A JSON token representing the null value always
+ /// leads to a null data result. In any other situation, the JSON token is preserved as a
+ /// that can be used for further deserialization (e.g. to a specific CLR type).
+ ///
+ ///
+ /// In a binary mode message, the data is parsed based on the content type of the message. When the content
+ /// type is absent or has a media type of "application/json", the data is parsed as JSON, with the result as
+ /// a . When the content type has a media type beginning with "text/", the data is parsed
+ /// as a string. In all other cases, the data is left as a byte array.
+ ///
+ ///
public class JsonEventFormatter : CloudEventFormatter
{
+ private const string JsonMediaType = "application/json";
private const string DataBase64 = "data_base64";
private const string Data = "data";
- public const string MediaTypeSuffix = "+json";
+ private const string MediaTypeSuffix = "+json";
- public override async Task DecodeStructuredEventAsync(Stream data, IEnumerable extensionAttributes)
+ private readonly JsonSerializer serializer;
+
+ ///
+ /// Creates a JsonEventFormatter that uses a default .
+ ///
+ public JsonEventFormatter() : this(JsonSerializer.CreateDefault())
{
- var jsonReader = new JsonTextReader(new StreamReader(data, Encoding.UTF8, true, 8192, true))
- {
- DateParseHandling = DateParseHandling.None
- };
+ }
+
+ ///
+ /// Creates a JsonEventFormatter that uses the specified
+ /// to serialize objects as JSON.
+ ///
+ public JsonEventFormatter(JsonSerializer serializer)
+ {
+ this.serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
+ }
+
+ public override async Task DecodeStructuredModeMessageAsync(Stream data, ContentType contentType, IEnumerable extensionAttributes)
+ {
+ var jsonReader = CreateJsonReader(data, contentType.GetEncoding());
var jObject = await JObject.LoadAsync(jsonReader).ConfigureAwait(false);
return DecodeJObject(jObject, extensionAttributes);
}
- public override CloudEvent DecodeStructuredEvent(Stream data, IEnumerable extensionAttributes = null)
+ public override CloudEvent DecodeStructuredModeMessage(Stream data, ContentType contentType, IEnumerable extensionAttributes)
{
- var jsonReader = new JsonTextReader(new StreamReader(data, Encoding.UTF8, true, 8192, true))
- {
- DateParseHandling = DateParseHandling.None
- };
+ var jsonReader = CreateJsonReader(data, contentType.GetEncoding());
var jObject = JObject.Load(jsonReader);
return DecodeJObject(jObject, extensionAttributes);
}
- public override CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable extensionAttributes = null) =>
- DecodeStructuredEvent(new MemoryStream(data), extensionAttributes);
+ public override CloudEvent DecodeStructuredModeMessage(byte[] data, ContentType contentType, IEnumerable extensionAttributes) =>
+ DecodeStructuredModeMessage(new MemoryStream(data), contentType, extensionAttributes);
private CloudEvent DecodeJObject(JObject jObject, IEnumerable extensionAttributes = null)
{
- CloudEventsSpecVersion specVersion = CloudEventsSpecVersion.Default;
- if (jObject.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionToken))
+ if (!jObject.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionToken)
+ || specVersionToken.Type != JTokenType.String)
{
- string versionId = (string)specVersionToken;
- specVersion = CloudEventsSpecVersion.FromVersionId(versionId);
- // TODO: Throw if specVersion is null?
+ throw new ArgumentException($"Structured mode content does not represent a CloudEvent");
+ }
+ var specVersion = CloudEventsSpecVersion.FromVersionId((string) specVersionToken);
+ if (specVersion is null)
+ {
+ throw new ArgumentException($"Unsupported CloudEvents spec version '{(string)specVersionToken}'");
}
var cloudEvent = new CloudEvent(specVersion, extensionAttributes);
+ PopulateAttributesFromStructuredEvent(cloudEvent, jObject);
+ PopulateDataFromStructuredEvent(cloudEvent, jObject);
+
+ return cloudEvent;
+ }
+
+ private void PopulateAttributesFromStructuredEvent(CloudEvent cloudEvent, JObject jObject)
+ {
foreach (var keyValuePair in jObject)
{
var key = keyValuePair.Key;
var value = keyValuePair.Value;
// Skip the spec version attribute, which we've already taken account of.
- if (key == CloudEventsSpecVersion.SpecVersionAttribute.Name)
+ // Data is handled later, when everything else (importantly, the data content type)
+ // has been populated.
+ if (key == CloudEventsSpecVersion.SpecVersionAttribute.Name ||
+ key == DataBase64 ||
+ key == Data)
{
continue;
}
- // TODO: Is the data_base64 name version-specific?
- if (specVersion == CloudEventsSpecVersion.V1_0 && key == DataBase64)
- {
- // Handle base64 encoded binaries
- cloudEvent.Data = Convert.FromBase64String((string)value);
- continue;
- }
- if (key == Data)
- {
- // FIXME: Deserialize where appropriate.
- // Consider whether there are any options here to consider beyond "string" and "object".
- // (e.g. arrays, numbers etc).
- // Note: the cast to "object" is important here, otherwise the string branch is implicitly
- // converted back to JToken...
- cloudEvent.Data = value.Type == JTokenType.String ? (string) value : (object) value;
- continue;
- }
-
var attribute = cloudEvent.GetAttribute(key);
// Set the attribute in the event, taking account of mismatches between the type in the JObject
@@ -96,18 +152,53 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
string attributeValue = value.Type switch
{
JTokenType.String => (string)value,
+ JTokenType.Boolean => CloudEventAttributeType.Boolean.Format((bool)value),
JTokenType.Null => null, // TODO: Check we want to do this. It's a bit weird.
JTokenType.Integer => CloudEventAttributeType.Integer.Format((int)value),
_ => throw new ArgumentException($"Invalid token type '{value.Type}' for CloudEvent attribute")
};
-
+ // Note: we *could* infer an extension type of integer and Boolean, but not other extension types.
+ // (We don't want to assume that everything that looks like a timestamp is a timestamp, etc.)
+ // Stick to strings for consistency.
cloudEvent.SetAttributeFromString(key, attributeValue);
}
-
- return cloudEvent;
}
- public override byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType)
+ private void PopulateDataFromStructuredEvent(CloudEvent cloudEvent, JObject jObject)
+ {
+ jObject.TryGetValue(Data, out var dataToken);
+ jObject.TryGetValue(DataBase64, out var dataBase64Token);
+ if (dataToken is null && dataBase64Token is null)
+ {
+ return;
+ }
+ if (dataToken is object && dataBase64Token is object)
+ {
+ throw new ArgumentException($"Structured mode content cannot contain both '{Data}' and '{DataBase64}' properties.");
+ }
+ if (dataBase64Token is object)
+ {
+ if (dataBase64Token.Type != JTokenType.String)
+ {
+ throw new ArgumentException($"Structured mode property '{DataBase64}' must be a string, when present.");
+ }
+ cloudEvent.Data = Convert.FromBase64String((string) dataBase64Token);
+ }
+ else
+ {
+ if (dataToken.Type == JTokenType.Null)
+ {
+ return;
+ }
+ // Convert JSON string tokens to string values when the content type suggests that's appropriate,
+ // otherwise leave the token as it is.
+ cloudEvent.Data = dataToken.Type == JTokenType.String && cloudEvent.DataContentType?.StartsWith("text/") == true
+ ? (string) dataToken
+ : (object) dataToken; // Deliberately cast to object to avoid any implicit conversions
+ }
+ }
+
+ public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
{
contentType = new ContentType("application/cloudevents+json")
{
@@ -115,21 +206,23 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
};
JObject jObject = new JObject();
+ jObject[CloudEventsSpecVersion.SpecVersionAttribute.Name] = cloudEvent.SpecVersion.VersionId;
var attributes = cloudEvent.GetPopulatedAttributes();
foreach (var keyValuePair in attributes)
{
jObject[keyValuePair.Key.Name] = JToken.FromObject(keyValuePair.Value);
}
- // FIXME: This is all a bit arbitrary.
if (cloudEvent.Data is object)
{
- // FIXME: This assumes there's nothing beyond the media type...
- if (cloudEvent.DataContentType == "application/json")
+ ContentType dataContentType = new ContentType(cloudEvent.DataContentType ?? JsonMediaType);
+ if (dataContentType.MediaType == JsonMediaType)
{
- jObject[Data] = JToken.FromObject(cloudEvent.Data);
+ jObject[Data] = cloudEvent.Data is JToken token
+ ? token
+ : JToken.FromObject(cloudEvent.Data);
}
- else if (cloudEvent.Data is string text && cloudEvent.DataContentType?.StartsWith("text/") == true)
+ else if (cloudEvent.Data is string text && dataContentType.MediaType.StartsWith("text/"))
{
jObject[Data] = text;
}
@@ -139,7 +232,7 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
}
else
{
- throw new ArgumentException($"{nameof(JsonEventFormatter)} cannot serialize data of type {cloudEvent.Data.GetType()} with content type {cloudEvent.DataContentType}");
+ throw new ArgumentException($"{nameof(JsonEventFormatter)} cannot serialize data of type {cloudEvent.Data.GetType()} with content type '{cloudEvent.DataContentType}'");
}
}
@@ -147,39 +240,54 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
}
// TODO: How should the caller know whether the result is "raw" or should be stored in data_base64?
- public override byte[] EncodeData(object value)
+ public override byte[] EncodeBinaryModeEventData(CloudEvent cloudEvent)
{
- // TODO: Check this is what we want.
- // In particular, if this is just other text or binary data, rather than JSON, what does it
- // mean to have a JSON event format?
- string json = value switch
+ if (cloudEvent.Data is null)
{
- JToken token => token.ToString(), // Formatting?
- string text => text,
- byte[] data => Convert.ToBase64String(data),
- null => null,
- _ => JsonConvert.SerializeObject(value)
- };
- return json is null ? new byte[0] : Encoding.UTF8.GetBytes(json);
+ return Array.Empty();
+ }
+ ContentType contentType = new ContentType(cloudEvent.DataContentType ?? JsonMediaType);
+ if (contentType.MediaType == JsonMediaType)
+ {
+ string json = JsonConvert.SerializeObject(cloudEvent.Data);
+ return contentType.GetEncoding().GetBytes(json);
+ }
+ if (contentType.MediaType.StartsWith("text/") && cloudEvent.Data is string text)
+ {
+ return contentType.GetEncoding().GetBytes(text);
+ }
+ if (cloudEvent.Data is byte[] bytes)
+ {
+ return bytes;
+ }
+ throw new ArgumentException($"{nameof(JsonEventFormatter)} cannot serialize data of type {cloudEvent.Data.GetType()} with content type '{cloudEvent.DataContentType}'");
}
- public override object DecodeData(byte[] value, string contentType)
+ public override void DecodeBinaryModeEventData(byte[] value, CloudEvent cloudEvent)
{
- if (contentType == "application/json")
+ ContentType contentType = new ContentType(cloudEvent.DataContentType ?? JsonMediaType);
+
+ Encoding encoding = contentType.GetEncoding();
+
+ if (contentType.MediaType == JsonMediaType)
{
- var jsonReader = new JsonTextReader(new StreamReader(new MemoryStream(value), Encoding.UTF8, true, 8192, true))
- {
- DateParseHandling = DateParseHandling.DateTimeOffset
- };
- return JToken.Load(jsonReader);
+ var jsonReader = CreateJsonReader(new MemoryStream(value), encoding);
+ cloudEvent.Data = JToken.Load(jsonReader);
}
- else if (contentType?.StartsWith("text/") == true)
+ else if (contentType.MediaType.StartsWith("text/") == true)
{
- // FIXME: Even if we want to do this, we really need to know if there's a content encoding.
- return Encoding.UTF8.GetString(value);
+ cloudEvent.Data = encoding.GetString(value);
+ }
+ else
+ {
+ cloudEvent.Data = value;
}
- // TODO: Clone?
- return value;
}
+
+ private static JsonReader CreateJsonReader(Stream stream, Encoding encoding) =>
+ new JsonTextReader(new StreamReader(stream, encoding ?? Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 8192, leaveOpen: true))
+ {
+ DateParseHandling = DateParseHandling.None
+ };
}
}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents/CloudEventFormatter.cs b/src/CloudNative.CloudEvents/CloudEventFormatter.cs
index 3c27b2e..2e30ff5 100644
--- a/src/CloudNative.CloudEvents/CloudEventFormatter.cs
+++ b/src/CloudNative.CloudEvents/CloudEventFormatter.cs
@@ -11,61 +11,107 @@ using System.Threading.Tasks;
namespace CloudNative.CloudEvents
{
///
- /// Implemented by formatters
+ /// Performs CloudEvent conversions as part of encoding and decoding messages for protocol bindings.
///
+ ///
+ ///
+ /// Event formatters are responsible for complete CloudEvent encoding and decoding for structured-mode messages (where
+ /// all the CloudEvent information is represented within the message body), and data-only encoding and decoding
+ /// for binary-mode messages (where CloudEvent attributes are represented in message metadata, and the CloudEvent data
+ /// is represented in the message body).
+ ///
+ ///
+ /// Each event formatter type is responsible for documenting what types of value are acceptable for the
+ /// property in CloudEvents it is asked to encode, and likewise what types of value will be present in the same property
+ /// when it is asked to decode a message. Event formatters should aim to be as consistent as possible with respect to data handling
+ /// between structured and binary modes, although this is not always possible as the structured mode representation may contain
+ /// more hints around how to interpret the data than the binary mode representation. Inconsistencies should be carefully
+ /// noted so that consumers can write robust code.
+ ///
+ ///
+ /// An event format is often naturally associated with a particular kind of data, but it is not limited to working with
+ /// that kind. For example, the JSON event format allows JSON data to be stored particularly naturally within the structured-mode
+ /// message body (which is itself JSON), but it is still able to handle arbitrary binary or text data.
+ ///
+ ///
public abstract class CloudEventFormatter
{
///
- /// Decode a structured event from a stream. The default implementation copies the
- /// content of the stream into a byte array before passing it to ,
+ /// Asynchronously decodes a CloudEvent from a structured-mode message body, represented as a stream. The default implementation copies the
+ /// content of the stream into a byte array before passing it to
/// but this can be overridden by event formatters that can decode a stream more efficiently.
///
- ///
- ///
- ///
- public virtual CloudEvent DecodeStructuredEvent(Stream data, IEnumerable extensionAttributes)
+ /// The data within the message body. Must not be null.
+ /// The content type of the message, or null if no content type is known.
+ /// Typically this is a content type with a media type of "application/cloudevents"; the additional
+ /// information such as the charset parameter may be needed in order to decode the data.
+ /// The extension attributes to use when populating the CloudEvent. May be null.
+ /// The decoded CloudEvent.
+ public virtual CloudEvent DecodeStructuredModeMessage(Stream data, ContentType contentType, IEnumerable extensionAttributes)
{
var bytes = BinaryDataUtilities.ToByteArray(data);
- return DecodeStructuredEvent(bytes, extensionAttributes);
+ return DecodeStructuredModeMessage(bytes, contentType, extensionAttributes);
}
///
- /// Decode a structured event from a stream. The default implementation asynchronously copies the
- /// content of the stream into a byte array before passing it to ,
+ /// Decodes a CloudEvent from a structured-mode message body, represented as a stream. The default implementation asynchronously copies the
+ /// content of the stream into a byte array before passing it to
/// but this can be overridden by event formatters that can decode a stream more efficiently.
///
- ///
- ///
- ///
- public virtual async Task DecodeStructuredEventAsync(Stream data, IEnumerable extensionAttributes)
+ /// The data within the message body. Must not be null.
+ /// The content type of the message, or null if no content type is known.
+ /// Typically this is a content type with a media type of "application/cloudevents"; the additional
+ /// information such as the charset parameter may be needed in order to decode the data.
+ /// The extension attributes to use when populating the CloudEvent. May be null.
+ /// The CloudEvent derived from the structured data.
+ public virtual async Task DecodeStructuredModeMessageAsync(Stream data, ContentType contentType, IEnumerable extensionAttributes)
{
var bytes = await BinaryDataUtilities.ToByteArrayAsync(data).ConfigureAwait(false);
- return DecodeStructuredEvent(bytes, extensionAttributes);
+ return DecodeStructuredModeMessage(bytes, contentType, extensionAttributes);
}
- // TODO: Remove either this one or the stream one? It seems unnecessary to have both.
-
///
- /// Decode a structured event from a byte array
+ /// Decodes a CloudEvent from a structured-mode message body, represented as a byte array.
///
- ///
- ///
- ///
- public virtual CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable extensionAttributes) =>
+ /// The data within the message body. Must not be null.
+ /// The content type of the message, or null if no content type is known.
+ /// Typically this is a content type with a media type of "application/cloudevents"; the additional
+ /// information such as the charset parameter may be needed in order to decode the data.
+ /// The extension attributes to use when populating the CloudEvent. May be null.
+ /// The CloudEvent derived from the structured data.
+ public virtual CloudEvent DecodeStructuredModeMessage(byte[] data, ContentType contentType, IEnumerable extensionAttributes) =>
throw new NotImplementedException();
///
- /// Encode an structured event into a byte array
+ /// Encodes a CloudEvent as the body of a structured-mode message.
///
- ///
- ///
- ///
- public virtual byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType) =>
+ /// The CloudEvent to encode. Must not be null.
+ /// On successful return, the content type of the structured-mode data.
+ /// Must not be null (on return).
+ /// The structured-mode representation of the CloudEvent.
+ public virtual byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType) =>
throw new NotImplementedException();
- // TODO: Work out whether this is what we want, and whether to potentially
- // separate it into a separate interface.
- public virtual byte[] EncodeData(object value) => throw new NotImplementedException();
- public virtual object DecodeData(byte[] value, string contentType) => throw new NotImplementedException();
+ ///
+ /// Encodes the data from in a manner suitable for a binary mode message.
+ ///
+ /// The data in the given CloudEvent cannot be encoded by this
+ /// event formatter.
+ /// The binary-mode representation of the CloudEvent.
+ public virtual byte[] EncodeBinaryModeEventData(CloudEvent cloudEvent) =>
+ throw new NotImplementedException();
+
+ ///
+ /// Decodes the given data obtained from a binary-mode message, populating the
+ /// property of . Other attributes within the CloudEvent may be used to inform
+ /// the interpretation of the data. This method is expected to be called after all other aspects of the CloudEvent
+ /// have been populated.
+ ///
+ /// The data from the message. Must not be null, but may be empty.
+ /// The CloudEvent whose Data property should be populated. Must not be null.
+ /// The data in the given CloudEvent cannot be decoded by this
+ /// event formatter.
+ public virtual void DecodeBinaryModeEventData(byte[] data, CloudEvent cloudEvent) =>
+ throw new NotImplementedException();
}
}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents/Http/CloudEventHttpContent.cs b/src/CloudNative.CloudEvents/Http/CloudEventHttpContent.cs
index 103d7d8..467a4d0 100644
--- a/src/CloudNative.CloudEvents/Http/CloudEventHttpContent.cs
+++ b/src/CloudNative.CloudEvents/Http/CloudEventHttpContent.cs
@@ -6,19 +6,20 @@ using System;
using System.IO;
using System.Net;
using System.Net.Http;
-using System.Net.Http.Headers;
using System.Net.Mime;
using System.Threading.Tasks;
namespace CloudNative.CloudEvents.Http
{
+ // TODO: Do we really need to have a subclass here? How about a static factory method instead?
+
///
/// This class is for use with `HttpClient` and constructs content and headers for
/// a HTTP request from a CloudEvent.
///
public class CloudEventHttpContent : HttpContent
{
- IInnerContent inner;
+ private readonly InnerByteArrayContent inner;
///
/// Constructor
@@ -28,70 +29,44 @@ namespace CloudNative.CloudEvents.Http
/// Event formatter
public CloudEventHttpContent(CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
{
- if (contentMode == ContentMode.Structured)
+ byte[] content;
+ ContentType contentType;
+ switch (contentMode)
{
- inner = new InnerByteArrayContent(formatter.EncodeStructuredEvent(cloudEvent, out var contentType));
- // This is optional in the specification, but can be useful.
- MapHeaders(cloudEvent, includeDataContentType: true);
- Headers.ContentType = new MediaTypeHeaderValue(contentType.MediaType);
- return;
- }
+ case ContentMode.Structured:
+ content = formatter.EncodeStructuredModeMessage(cloudEvent, out contentType);
+ // This is optional in the specification, but can be useful.
+ MapHeaders(cloudEvent, includeDataContentType: true);
+ break;
+ case ContentMode.Binary:
+ content = formatter.EncodeBinaryModeEventData(cloudEvent);
+ contentType = MimeUtilities.CreateContentTypeOrNull(cloudEvent.DataContentType);
+ MapHeaders(cloudEvent, includeDataContentType: false);
+ break;
+ default:
+ throw new ArgumentException($"Unsupported content mode: {contentMode}");
- // TODO: Shouldn't we use the formatter in all cases? If I have a JSON formatter and
- // If we specify that the the data is a byte array, I'd expect to end up with a base64-encoded representation...
- if (cloudEvent.Data is byte[])
- {
- inner = new InnerByteArrayContent((byte[])cloudEvent.Data);
}
- else if (cloudEvent.Data is string)
+ inner = new InnerByteArrayContent(content);
+ if (contentType is object)
{
- inner = new InnerStringContent((string)cloudEvent.Data);
+ Headers.ContentType = contentType.ToMediaTypeHeaderValue();
}
- else if (cloudEvent.Data is Stream)
- {
- inner = new InnerStreamContent((Stream)cloudEvent.Data);
- }
- else
- {
- inner = new InnerByteArrayContent(formatter.EncodeData(cloudEvent.Data));
- }
-
- // We don't require a data content type if there isn't any data.
- // We may not be able to tell whether the data is empty or not, but we're lenient
- // in that case.
- var dataContentType = cloudEvent.DataContentType;
- if (dataContentType is object)
- {
- var mediaType = new ContentType(dataContentType).MediaType;
- Headers.ContentType = new MediaTypeHeaderValue(mediaType);
- }
- else if (TryComputeLength(out var length) && length != 0L)
+ else if (content.Length != 0)
{
throw new ArgumentException(Strings.ErrorContentTypeUnspecified, nameof(cloudEvent));
}
- MapHeaders(cloudEvent, includeDataContentType: false);
}
- private interface IInnerContent
- {
- Task InnerSerializeToStreamAsync(Stream stream, TransportContext context);
- bool InnerTryComputeLength(out long length);
- }
+ protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) =>
+ inner.InnerSerializeToStreamAsync(stream, context);
- protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
- {
- return inner.InnerSerializeToStreamAsync(stream, context);
- }
-
- protected override bool TryComputeLength(out long length)
- {
- return inner.InnerTryComputeLength(out length);
- }
+ protected override bool TryComputeLength(out long length) =>
+ inner.InnerTryComputeLength(out length);
private void MapHeaders(CloudEvent cloudEvent, bool includeDataContentType)
{
- Headers.Add(HttpUtilities.HttpHeaderPrefix + CloudEventsSpecVersion.SpecVersionAttribute.Name,
- HttpUtilities.EncodeHeaderValue(cloudEvent.SpecVersion.VersionId));
+ Headers.Add(HttpUtilities.SpecVersionHttpHeader, HttpUtilities.EncodeHeaderValue(cloudEvent.SpecVersion.VersionId));
foreach (var attributeAndValue in cloudEvent.GetPopulatedAttributes())
{
CloudEventAttribute attribute = attributeAndValue.Key;
@@ -115,7 +90,7 @@ namespace CloudNative.CloudEvents.Http
/// This inner class is required to get around the 'protected'-ness of the
/// override functions of HttpContent for enabling containment/delegation
///
- class InnerByteArrayContent : ByteArrayContent, IInnerContent
+ class InnerByteArrayContent : ByteArrayContent
{
public InnerByteArrayContent(byte[] content) : base(content)
{
@@ -131,47 +106,5 @@ namespace CloudNative.CloudEvents.Http
return base.TryComputeLength(out length);
}
}
-
- ///
- /// This inner class is required to get around the 'protected'-ness of the
- /// override functions of HttpContent for enabling containment/delegation
- ///
- class InnerStreamContent : StreamContent, IInnerContent
- {
- public InnerStreamContent(Stream content) : base(content)
- {
- }
-
- public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context)
- {
- return base.SerializeToStreamAsync(stream, context);
- }
-
- public bool InnerTryComputeLength(out long length)
- {
- return base.TryComputeLength(out length);
- }
- }
-
- ///
- /// This inner class is required to get around the 'protected'-ness of the
- /// override functions of HttpContent for enabling containment/delegation
- ///
- class InnerStringContent : StringContent, IInnerContent
- {
- public InnerStringContent(string content) : base(content)
- {
- }
-
- public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context)
- {
- return base.SerializeToStreamAsync(stream, context);
- }
-
- public bool InnerTryComputeLength(out long length)
- {
- return base.TryComputeLength(out length);
- }
- }
}
}
\ No newline at end of file
diff --git a/src/CloudNative.CloudEvents/Http/HttpClientExtensions.cs b/src/CloudNative.CloudEvents/Http/HttpClientExtensions.cs
index 7f82292..d99cb6d 100644
--- a/src/CloudNative.CloudEvents/Http/HttpClientExtensions.cs
+++ b/src/CloudNative.CloudEvents/Http/HttpClientExtensions.cs
@@ -8,6 +8,7 @@ using System.Linq;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
+using System.Net.Mime;
using System.Threading.Tasks;
namespace CloudNative.CloudEvents.Http
@@ -102,7 +103,7 @@ namespace CloudNative.CloudEvents.Http
{
// FIXME: Handle no formatter being specified.
var stream = await content.ReadAsStreamAsync().ConfigureAwait(false);
- return await formatter.DecodeStructuredEventAsync(stream, extensionAttributes).ConfigureAwait(false);
+ return await formatter.DecodeStructuredModeMessageAsync(stream, content.Headers.ContentType.ToContentType(), extensionAttributes).ConfigureAwait(false);
}
else
{
@@ -136,7 +137,7 @@ namespace CloudNative.CloudEvents.Http
// TODO: Should this just be the media type? We probably need to take a full audit of this...
cloudEvent.DataContentType = content.Headers?.ContentType?.ToString();
var data = await content.ReadAsByteArrayAsync().ConfigureAwait(false);
- cloudEvent.Data = formatter.DecodeData(data, cloudEvent.DataContentType);
+ formatter.DecodeBinaryModeEventData(data, cloudEvent);
}
return cloudEvent;
}
diff --git a/src/CloudNative.CloudEvents/Http/HttpListenerExtensions.cs b/src/CloudNative.CloudEvents/Http/HttpListenerExtensions.cs
index fad73d5..cc948a7 100644
--- a/src/CloudNative.CloudEvents/Http/HttpListenerExtensions.cs
+++ b/src/CloudNative.CloudEvents/Http/HttpListenerExtensions.cs
@@ -5,6 +5,7 @@
using System;
using System.IO;
using System.Net;
+using System.Net.Mime;
using System.Threading.Tasks;
namespace CloudNative.CloudEvents.Http
@@ -30,17 +31,17 @@ namespace CloudNative.CloudEvents.Http
{
if (contentMode == ContentMode.Structured)
{
- var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
+ var buffer = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
httpListenerResponse.ContentType = contentType.ToString();
MapAttributesToListenerResponse(cloudEvent, httpListenerResponse);
return httpListenerResponse.OutputStream.WriteAsync(buffer, 0, buffer.Length);
}
-
- Stream stream = HttpUtilities.MapDataAttributeToStream(cloudEvent, formatter);
+
// TODO: Check the defaulting to JSON here...
httpListenerResponse.ContentType = cloudEvent.DataContentType?.ToString() ?? "application/json";
MapAttributesToListenerResponse(cloudEvent, httpListenerResponse);
- return stream.CopyToAsync(httpListenerResponse.OutputStream);
+ byte[] content = formatter.EncodeBinaryModeEventData(cloudEvent);
+ return httpListenerResponse.OutputStream.WriteAsync(content, 0, content.Length);
}
// TODO: Do we want this? It's not about CloudEvents...
@@ -97,14 +98,16 @@ namespace CloudNative.CloudEvents.Http
/// Listener request
///
/// List of extension instances
- /// A CloudEvent instance or 'null' if the request message doesn't hold a CloudEvent
+ /// The CloudEvent corresponding to the given request.
+ /// The request does not represent a CloudEvent,
+ /// or the event's specification version is not supported,
+ /// or the event formatter cannot interpret it.
public static CloudEvent ToCloudEvent(this HttpListenerRequest httpListenerRequest,
CloudEventFormatter formatter, params CloudEventAttribute[] extensionAttributes)
{
if (HasCloudEventsContentType(httpListenerRequest))
{
- // FIXME: Handle no formatter being specified.
- return formatter.DecodeStructuredEvent(httpListenerRequest.InputStream, extensionAttributes);
+ return formatter.DecodeStructuredModeMessage(httpListenerRequest.InputStream, MimeUtilities.CreateContentTypeOrNull(httpListenerRequest.ContentType), extensionAttributes);
}
else
{
@@ -132,16 +135,11 @@ namespace CloudNative.CloudEvents.Http
cloudEvent.SetAttributeFromString(attributeName, attributeValue);
}
- // TODO: Check that this doesn't come through as a header already
+ // The data content type should not have been set via a "ce-" header; instead,
+ // it's in the regular content type.
cloudEvent.DataContentType = httpListenerRequest.ContentType;
- // TODO: This is a bit ugly.
- var memoryStream = new MemoryStream();
- httpListenerRequest.InputStream.CopyTo(memoryStream);
- if (memoryStream.Length != 0)
- {
- cloudEvent.Data = formatter.DecodeData(memoryStream.ToArray(), cloudEvent.DataContentType);
- }
+ formatter.DecodeBinaryModeEventData(BinaryDataUtilities.ToByteArray(httpListenerRequest.InputStream), cloudEvent);
return cloudEvent;
}
}
diff --git a/src/CloudNative.CloudEvents/Http/HttpUtilities.cs b/src/CloudNative.CloudEvents/Http/HttpUtilities.cs
index b4e26bc..df5aca9 100644
--- a/src/CloudNative.CloudEvents/Http/HttpUtilities.cs
+++ b/src/CloudNative.CloudEvents/Http/HttpUtilities.cs
@@ -6,6 +6,8 @@ using System;
using System.IO;
using System.Net;
using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Net.Mime;
using System.Text;
using System.Threading.Tasks;
@@ -21,15 +23,6 @@ namespace CloudNative.CloudEvents.Http
public const string SpecVersionHttpHeader = HttpHeaderPrefix + "specversion";
- internal static Stream MapDataAttributeToStream(CloudEvent cloudEvent, CloudEventFormatter formatter) =>
- cloudEvent.Data switch
- {
- byte[] bytes => new MemoryStream(bytes),
- string text => new MemoryStream(Encoding.UTF8.GetBytes(text)),
- Stream stream => stream,
- object other => new MemoryStream(formatter.EncodeData(other))
- };
-
///
/// Checks whether the given HTTP header name starts with "ce-", and if so, converts it into
/// a lower-case attribute name.
diff --git a/src/CloudNative.CloudEvents/Http/HttpWebExtensions.cs b/src/CloudNative.CloudEvents/Http/HttpWebExtensions.cs
index a3cf8ee..febe1f4 100644
--- a/src/CloudNative.CloudEvents/Http/HttpWebExtensions.cs
+++ b/src/CloudNative.CloudEvents/Http/HttpWebExtensions.cs
@@ -29,16 +29,16 @@ namespace CloudNative.CloudEvents.Http
{
if (contentMode == ContentMode.Structured)
{
- var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
+ var buffer = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
httpWebRequest.ContentType = contentType.ToString();
await httpWebRequest.GetRequestStream().WriteAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
return;
}
- Stream stream = HttpUtilities.MapDataAttributeToStream(cloudEvent, formatter);
httpWebRequest.ContentType = cloudEvent.DataContentType?.ToString() ?? "application/json";
MapAttributesToWebRequest(cloudEvent, httpWebRequest);
- await stream.CopyToAsync(httpWebRequest.GetRequestStream());
+ byte[] content = formatter.EncodeBinaryModeEventData(cloudEvent);
+ await httpWebRequest.GetRequestStream().WriteAsync(content, 0, content.Length).ConfigureAwait(false);
}
static void MapAttributesToWebRequest(CloudEvent cloudEvent, HttpWebRequest httpWebRequest)
diff --git a/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs b/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs
index 9aff548..419afaf 100644
--- a/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs
@@ -5,8 +5,8 @@
using CloudNative.CloudEvents.NewtonsoftJson;
using System;
using System.Net.Mime;
-using System.Text;
using Xunit;
+using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
using static CloudNative.CloudEvents.UnitTests.TestHelpers;
namespace CloudNative.CloudEvents.Avro.UnitTests
@@ -30,9 +30,9 @@ namespace CloudNative.CloudEvents.Avro.UnitTests
{
var jsonFormatter = new JsonEventFormatter();
var avroFormatter = new AvroEventFormatter();
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
- var avroData = avroFormatter.EncodeStructuredEvent(cloudEvent, out var contentType);
- var cloudEvent2 = avroFormatter.DecodeStructuredEvent(avroData, new CloudEventAttribute[0]);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeText(jsonv10);
+ var avroData = avroFormatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
+ var cloudEvent2 = avroFormatter.DecodeStructuredModeMessage(avroData, contentType, extensionAttributes: null);
Assert.Equal(cloudEvent2.SpecVersion, cloudEvent.SpecVersion);
Assert.Equal(cloudEvent2.Type, cloudEvent.Type);
@@ -48,9 +48,9 @@ namespace CloudNative.CloudEvents.Avro.UnitTests
{
var jsonFormatter = new JsonEventFormatter();
var avroFormatter = new AvroEventFormatter();
- var cloudEventJ = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
- var avroData = avroFormatter.EncodeStructuredEvent(cloudEventJ, out var contentType);
- var cloudEvent = avroFormatter.DecodeStructuredEvent(avroData, new CloudEventAttribute[0]);
+ var cloudEventJ = jsonFormatter.DecodeStructuredModeText(jsonv10);
+ var avroData = avroFormatter.EncodeStructuredModeMessage(cloudEventJ, out var contentType);
+ var cloudEvent = avroFormatter.DecodeStructuredModeMessage(avroData, contentType, extensionAttributes: null);
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", cloudEvent.Type);
@@ -69,9 +69,9 @@ namespace CloudNative.CloudEvents.Avro.UnitTests
var jsonFormatter = new JsonEventFormatter();
var avroFormatter = new AvroEventFormatter();
var extensionAttribute = CloudEventAttribute.CreateExtension("comexampleextension1", CloudEventAttributeType.String);
- var cloudEventJ = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10), new[] { extensionAttribute });
- var avroData = avroFormatter.EncodeStructuredEvent(cloudEventJ, out var contentType);
- var cloudEvent = avroFormatter.DecodeStructuredEvent(avroData, new[] { extensionAttribute });
+ var cloudEventJ = jsonFormatter.DecodeStructuredModeText(jsonv10, new[] { extensionAttribute });
+ var avroData = avroFormatter.EncodeStructuredModeMessage(cloudEventJ, out var contentType);
+ var cloudEvent = avroFormatter.DecodeStructuredModeMessage(avroData, contentType, new[] { extensionAttribute });
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", cloudEvent.Type);
diff --git a/test/CloudNative.CloudEvents.UnitTests/CloudEventFormatterExtensions.cs b/test/CloudNative.CloudEvents.UnitTests/CloudEventFormatterExtensions.cs
new file mode 100644
index 0000000..d7c0e1f
--- /dev/null
+++ b/test/CloudNative.CloudEvents.UnitTests/CloudEventFormatterExtensions.cs
@@ -0,0 +1,23 @@
+// Copyright 2021 Cloud Native Foundation.
+// Licensed under the Apache 2.0 license.
+// See LICENSE file in the project root for full license information.
+
+using System.Collections.Generic;
+using System.Text;
+
+namespace CloudNative.CloudEvents.UnitTests
+{
+ ///
+ /// Extension methods for CloudEventFormatters to simplify testing.
+ /// Often in tests we have structured mode data as strings, and usually the content type isn't important,
+ /// so it's useful to be able to just decode that string directly.
+ ///
+ internal static class CloudEventFormatterExtensions
+ {
+ internal static CloudEvent DecodeStructuredModeText(this CloudEventFormatter eventFormatter, string text) =>
+ eventFormatter.DecodeStructuredModeMessage(Encoding.UTF8.GetBytes(text), contentType: null, extensionAttributes: null);
+
+ internal static CloudEvent DecodeStructuredModeText(this CloudEventFormatter eventFormatter, string text, IEnumerable extensionAttributes) =>
+ eventFormatter.DecodeStructuredModeMessage(Encoding.UTF8.GetBytes(text), contentType: null, extensionAttributes);
+ }
+}
diff --git a/test/CloudNative.CloudEvents.UnitTests/Extensions/DistributedTracingTest.cs b/test/CloudNative.CloudEvents.UnitTests/Extensions/DistributedTracingTest.cs
index 7130d05..c45ec83 100644
--- a/test/CloudNative.CloudEvents.UnitTests/Extensions/DistributedTracingTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/Extensions/DistributedTracingTest.cs
@@ -5,6 +5,7 @@
using CloudNative.CloudEvents.NewtonsoftJson;
using System.Text;
using Xunit;
+using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
namespace CloudNative.CloudEvents.Extensions.UnitTests
{
@@ -27,7 +28,7 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
public void ParseJson()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson), DistributedTracing.AllAttributes);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeText(sampleJson, DistributedTracing.AllAttributes);
Assert.Equal(SampleParent, cloudEvent[DistributedTracing.TraceParentAttribute]);
Assert.Equal(SampleParent, cloudEvent.GetTraceParent());
@@ -39,9 +40,9 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
public void Transcode()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
- var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out _);
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData, DistributedTracing.AllAttributes);
+ var cloudEvent1 = jsonFormatter.DecodeStructuredModeText(sampleJson);
+ var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent1, out var contentType);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType, DistributedTracing.AllAttributes);
Assert.Equal(SampleParent, cloudEvent[DistributedTracing.TraceParentAttribute]);
Assert.Equal(SampleParent, cloudEvent.GetTraceParent());
diff --git a/test/CloudNative.CloudEvents.UnitTests/Extensions/PartitioningTest.cs b/test/CloudNative.CloudEvents.UnitTests/Extensions/PartitioningTest.cs
index 6fcbb05..3c1e7b7 100644
--- a/test/CloudNative.CloudEvents.UnitTests/Extensions/PartitioningTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/Extensions/PartitioningTest.cs
@@ -5,6 +5,7 @@
using CloudNative.CloudEvents.NewtonsoftJson;
using System.Text;
using Xunit;
+using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
namespace CloudNative.CloudEvents.Extensions.UnitTests
{
@@ -24,7 +25,7 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
public void ParseJson()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
+ var cloudEvent = jsonFormatter.DecodeStructuredModeText(sampleJson);
Assert.Equal("abc", cloudEvent["partitionkey"]);
Assert.Equal("abc", cloudEvent[Partitioning.PartitionKeyAttribute]);
Assert.Equal("abc", cloudEvent.GetPartitionKey());
@@ -34,9 +35,9 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
public void Transcode()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
- var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out _);
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData);
+ var cloudEvent1 = jsonFormatter.DecodeStructuredModeText(sampleJson);
+ var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent1, out var contentType);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType, null);
Assert.Equal("abc", cloudEvent["partitionkey"]);
Assert.Equal("abc", cloudEvent[Partitioning.PartitionKeyAttribute]);
diff --git a/test/CloudNative.CloudEvents.UnitTests/Extensions/SamplingTest.cs b/test/CloudNative.CloudEvents.UnitTests/Extensions/SamplingTest.cs
index c1c0ea2..a2ba6eb 100644
--- a/test/CloudNative.CloudEvents.UnitTests/Extensions/SamplingTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/Extensions/SamplingTest.cs
@@ -6,6 +6,7 @@ using CloudNative.CloudEvents.NewtonsoftJson;
using System;
using System.Text;
using Xunit;
+using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
namespace CloudNative.CloudEvents.Extensions.UnitTests
{
@@ -24,7 +25,7 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
public void SamplingParse()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson), Sampling.AllAttributes);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeText(sampleJson, Sampling.AllAttributes);
Assert.Equal(1, cloudEvent["sampledrate"]);
Assert.Equal(1, cloudEvent.GetSampledRate());
@@ -34,12 +35,12 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
public void SamplingJsonTranscode()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
+ var cloudEvent1 = jsonFormatter.DecodeStructuredModeText(sampleJson);
// Note that the value is just a string here, as we don't know the attribute type.
Assert.Equal("1", cloudEvent1["sampledrate"]);
- var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out _);
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData, Sampling.AllAttributes);
+ var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent1, out var contentType);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType, Sampling.AllAttributes);
// When parsing with the attributes in place, the value is propagated as an integer.
Assert.Equal(1, cloudEvent["sampledrate"]);
diff --git a/test/CloudNative.CloudEvents.UnitTests/Extensions/SequenceTest.cs b/test/CloudNative.CloudEvents.UnitTests/Extensions/SequenceTest.cs
index f350d33..a6f1578 100644
--- a/test/CloudNative.CloudEvents.UnitTests/Extensions/SequenceTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/Extensions/SequenceTest.cs
@@ -6,6 +6,7 @@ using CloudNative.CloudEvents.NewtonsoftJson;
using System;
using System.Text;
using Xunit;
+using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
namespace CloudNative.CloudEvents.Extensions.UnitTests
{
@@ -25,7 +26,7 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
public void Parse()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson), Sequence.AllAttributes);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeText(sampleJson, Sequence.AllAttributes);
Assert.Equal("Integer", cloudEvent[Sequence.SequenceTypeAttribute]);
Assert.Equal("25", cloudEvent[Sequence.SequenceAttribute]);
@@ -35,9 +36,9 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
public void Transcode()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
- var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out var contentType);
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData, Sequence.AllAttributes);
+ var cloudEvent1 = jsonFormatter.DecodeStructuredModeText(sampleJson);
+ var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent1, out var contentType);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType, Sequence.AllAttributes);
Assert.Equal("Integer", cloudEvent[Sequence.SequenceTypeAttribute]);
Assert.Equal("25", cloudEvent[Sequence.SequenceAttribute]);
diff --git a/test/CloudNative.CloudEvents.UnitTests/Http/HttpClientExtensionsTest.cs b/test/CloudNative.CloudEvents.UnitTests/Http/HttpClientExtensionsTest.cs
index 36a4666..7e77aca 100644
--- a/test/CloudNative.CloudEvents.UnitTests/Http/HttpClientExtensionsTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/Http/HttpClientExtensionsTest.cs
@@ -145,7 +145,7 @@ namespace CloudNative.CloudEvents.Http.UnitTests
}
[Fact]
- async Task HttpStructuredClientReceiveTest()
+ public async Task HttpStructuredClientReceiveTest()
{
string ctx = Guid.NewGuid().ToString();
PendingRequests.TryAdd(ctx, async context =>
@@ -200,7 +200,7 @@ namespace CloudNative.CloudEvents.Http.UnitTests
}
[Fact]
- async Task HttpStructuredClientSendTest()
+ public async Task HttpStructuredClientSendTest()
{
var cloudEvent = new CloudEvent
{
@@ -231,7 +231,7 @@ namespace CloudNative.CloudEvents.Http.UnitTests
Assert.Equal("2018-04-05T17:31:00Z", headers["ce-time"]);
// Note that datacontenttype is mapped in this case, but would not be included in binary mode.
Assert.Equal("text/xml", headers["ce-datacontenttype"]);
- Assert.Equal("application/cloudevents+json", context.Request.ContentType);
+ Assert.Equal("application/cloudevents+json; charset=utf-8", context.Request.ContentType);
Assert.Equal("value", headers["ce-comexampleextension1"]);
// The non-ASCII attribute value should have been URL-encoded using UTF-8 for the header.
Assert.Equal("%C3%A6%C3%B8%C3%A5", headers["ce-utf8examplevalue"]);
diff --git a/test/CloudNative.CloudEvents.UnitTests/NewtonsoftJson/JsonEventFormatterTest.cs b/test/CloudNative.CloudEvents.UnitTests/NewtonsoftJson/JsonEventFormatterTest.cs
index 2afbbee..f15ea4a 100644
--- a/test/CloudNative.CloudEvents.UnitTests/NewtonsoftJson/JsonEventFormatterTest.cs
+++ b/test/CloudNative.CloudEvents.UnitTests/NewtonsoftJson/JsonEventFormatterTest.cs
@@ -6,6 +6,7 @@ using System;
using System.Net.Mime;
using System.Text;
using Xunit;
+using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
using static CloudNative.CloudEvents.UnitTests.TestHelpers;
namespace CloudNative.CloudEvents.NewtonsoftJson.UnitTests
@@ -30,11 +31,11 @@ namespace CloudNative.CloudEvents.NewtonsoftJson.UnitTests
public void ReserializeTest10()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
- var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent, out var contentType);
+ var cloudEvent = jsonFormatter.DecodeStructuredModeText(jsonv10);
+ var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
Assert.Equal("application/cloudevents+json", contentType.MediaType);
- var cloudEvent2 = jsonFormatter.DecodeStructuredEvent(jsonData);
+ var cloudEvent2 = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType: null, Array.Empty());
Assert.Equal(cloudEvent2.SpecVersion, cloudEvent.SpecVersion);
Assert.Equal(cloudEvent2.Type, cloudEvent.Type);
@@ -49,7 +50,7 @@ namespace CloudNative.CloudEvents.NewtonsoftJson.UnitTests
public void StructuredParseSuccess10()
{
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
+ var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(Encoding.UTF8.GetBytes(jsonv10), contentType: null, extensionAttributes: null);
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", cloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), cloudEvent.Source);
@@ -68,7 +69,7 @@ namespace CloudNative.CloudEvents.NewtonsoftJson.UnitTests
var extension = CloudEventAttribute.CreateExtension("comexampleextension2", CloudEventAttributeType.Integer);
var jsonFormatter = new JsonEventFormatter();
- var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10), new[] { extension });
+ var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(Encoding.UTF8.GetBytes(jsonv10), contentType: null, new[] { extension });
// Instead of getting it as a string (as before), we now have it as an integer.
Assert.Equal(10, cloudEvent["comexampleextension2"]);
}