Rework CloudEventFormatter
This is largely a matter of clarifying responsibilities and adding documentation to CloudEventFormatter and our current implementations. Additionally, the content type of the message is now passed to structured-mode decoding methods to assist with aspects such as character encodings. The methods have been renamed to make it clearer that it's the message that uses structured or binary mode, not the event itself. (There's no such thing as a "structured mode CloudEvent" or a "binary mode CloudEvent".) Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
parent
951a8c5c42
commit
23a3cd6e69
|
@ -6,6 +6,7 @@ using Amqp;
|
|||
using Amqp.Types;
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Net.Mime;
|
||||
|
||||
namespace CloudNative.CloudEvents.Amqp
|
||||
{
|
||||
|
@ -16,16 +17,16 @@ namespace CloudNative.CloudEvents.Amqp
|
|||
internal const string SpecVersionAmqpHeader = AmqpHeaderPrefix + "specversion";
|
||||
|
||||
public static bool IsCloudEvent(this Message message) =>
|
||||
(message.Properties.ContentType is Symbol contentType && contentType.ToString().StartsWith(CloudEvent.MediaType)) ||
|
||||
HasCloudEventsContentType(message, out _) ||
|
||||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader);
|
||||
|
||||
public static CloudEvent ToCloudEvent(this Message message,
|
||||
CloudEventFormatter formatter,
|
||||
params CloudEventAttribute[] extensionAttributes)
|
||||
{
|
||||
if (HasCloudEventsContentType(message))
|
||||
if (HasCloudEventsContentType(message, out var contentType))
|
||||
{
|
||||
return formatter.DecodeStructuredEvent(new MemoryStream((byte[])message.Body), extensionAttributes);
|
||||
return formatter.DecodeStructuredModeMessage(new MemoryStream((byte[])message.Body), new ContentType(contentType), extensionAttributes);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -88,7 +89,10 @@ namespace CloudNative.CloudEvents.Amqp
|
|||
}
|
||||
|
||||
// TODO: Check that it really is meant to be case-sensitive. (Original code was inconsistent.)
|
||||
private static bool HasCloudEventsContentType(Message message) =>
|
||||
message.Properties.ContentType is Symbol contentType && contentType.ToString().StartsWith(CloudEvent.MediaType);
|
||||
private static bool HasCloudEventsContentType(Message message, out string contentType)
|
||||
{
|
||||
contentType = (message.Properties.ContentType as Symbol)?.ToString();
|
||||
return contentType?.StartsWith(CloudEvent.MediaType) == true;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -22,7 +22,7 @@ namespace CloudNative.CloudEvents.Amqp
|
|||
{
|
||||
BodySection = new Data
|
||||
{
|
||||
Binary = formatter.EncodeStructuredEvent(cloudEvent, out var contentType)
|
||||
Binary = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType)
|
||||
};
|
||||
Properties = new Properties { ContentType = contentType.MediaType };
|
||||
ApplicationProperties = new ApplicationProperties();
|
||||
|
|
|
@ -7,6 +7,7 @@ using Microsoft.AspNetCore.Http;
|
|||
using System;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net.Mime;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CloudNative.CloudEvents
|
||||
|
@ -14,13 +15,13 @@ namespace CloudNative.CloudEvents
|
|||
public static class HttpRequestExtension
|
||||
{
|
||||
/// <summary>
|
||||
/// Converts this HTTP request into a CloudEvent object, with the given extensions,
|
||||
/// overriding the formatter.
|
||||
/// Converts this HTTP request into a CloudEvent object.
|
||||
/// </summary>
|
||||
/// <param name="httpRequest">HTTP request</param>
|
||||
/// <param name="formatter">The event formatter to use to process the request body.</param>
|
||||
/// <param name="extensions">List of extension instances</param>
|
||||
/// <returns>A CloudEvent instance or 'null' if the request message doesn't hold a CloudEvent</returns>
|
||||
/// <param name="httpRequest">The HTTP request to decode. Must not be null.</param>
|
||||
/// <param name="formatter">The event formatter to use to process the request body. Must not be null.</param>
|
||||
/// <param name="extensions">The extension attributes to use when populating the CloudEvent. May be null.</param>
|
||||
/// <returns>The decoded CloudEvent.</returns>
|
||||
/// <exception cref="ArgumentException">The request does not contain a CloudEvent.</exception>
|
||||
public static async ValueTask<CloudEvent> ReadCloudEventAsync(this HttpRequest httpRequest,
|
||||
CloudEventFormatter formatter,
|
||||
params CloudEventAttribute[] extensionAttributes)
|
||||
|
@ -28,7 +29,7 @@ namespace CloudNative.CloudEvents
|
|||
if (HasCloudEventsContentType(httpRequest))
|
||||
{
|
||||
// TODO: Handle formatter being null
|
||||
return await formatter.DecodeStructuredEventAsync(httpRequest.Body, extensionAttributes).ConfigureAwait(false);
|
||||
return await formatter.DecodeStructuredModeMessageAsync(httpRequest.Body, MimeUtilities.CreateContentTypeOrNull(httpRequest.ContentType), extensionAttributes).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -59,13 +60,11 @@ namespace CloudNative.CloudEvents
|
|||
cloudEvent.DataContentType = httpRequest.ContentType;
|
||||
if (httpRequest.Body is Stream body)
|
||||
{
|
||||
// TODO: This is a bit ugly.
|
||||
// TODO: This is a bit ugly. We have code in BinaryDataUtilities to handle this, but
|
||||
// we'd rather not expose it...
|
||||
var memoryStream = new MemoryStream();
|
||||
await body.CopyToAsync(memoryStream).ConfigureAwait(false);
|
||||
if (memoryStream.Length != 0)
|
||||
{
|
||||
cloudEvent.Data = formatter.DecodeData(memoryStream.ToArray(), cloudEvent.DataContentType);
|
||||
}
|
||||
formatter.DecodeBinaryModeEventData(memoryStream.ToArray(), cloudEvent);
|
||||
}
|
||||
return cloudEvent;
|
||||
}
|
||||
|
|
|
@ -13,10 +13,27 @@ using System.Net.Mime;
|
|||
namespace CloudNative.CloudEvents
|
||||
{
|
||||
/// <summary>
|
||||
/// Formatter that implements the Avro Event Format
|
||||
/// Formatter that implements the Avro Event Format.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// This event formatter currently only supports structured-mode messages.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// When encoding a CloudEvent, the data must be serializable as described in the
|
||||
/// <a href="https://github.com/cloudevents/spec/blob/v1.0.1/avro-format.md#3-data">CloudEvents Avro Event
|
||||
/// Format specification</a>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// When decoding a CloudEvent, the <see cref="CloudEvent.Data"/> property is populated directly from the
|
||||
/// Avro record, so the value will have the natural Avro deserialization type for that data (which may
|
||||
/// not be exactly the same as the type that was serialized).
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public class AvroEventFormatter : CloudEventFormatter
|
||||
{
|
||||
private const string MediaTypeSuffix = "+avro";
|
||||
private const string AttributeName = "attribute";
|
||||
private const string DataName = "data";
|
||||
private static readonly RecordSchema avroSchema;
|
||||
private static readonly DefaultReader avroReader;
|
||||
|
@ -24,42 +41,46 @@ namespace CloudNative.CloudEvents
|
|||
|
||||
static AvroEventFormatter()
|
||||
{
|
||||
// we're going to confidently assume that the embedded schema works. If not, type initialization
|
||||
// will fail and that's okay since the type is useless without the proper schema
|
||||
// We're going to confidently assume that the embedded schema works. If not, type initialization
|
||||
// will fail and that's okay since the type is useless without the proper schema.
|
||||
using (var sr = new StreamReader(typeof(AvroEventFormatter).Assembly.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json")))
|
||||
{
|
||||
avroSchema = (RecordSchema)RecordSchema.Parse(sr.ReadToEnd());
|
||||
avroSchema = (RecordSchema) Schema.Parse(sr.ReadToEnd());
|
||||
}
|
||||
avroReader = new DefaultReader(avroSchema, avroSchema);
|
||||
avroWriter = new DefaultWriter(avroSchema);
|
||||
}
|
||||
public const string MediaTypeSuffix = "+avro";
|
||||
|
||||
public override CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
public override CloudEvent DecodeStructuredModeMessage(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
var decoder = new BinaryDecoder(data);
|
||||
var rawEvent = avroReader.Read<GenericRecord>(null, decoder);
|
||||
return DecodeGenericRecord(rawEvent, extensionAttributes);
|
||||
}
|
||||
|
||||
public override CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
DecodeStructuredEvent(new MemoryStream(data), extensionAttributes);
|
||||
public override CloudEvent DecodeStructuredModeMessage(byte[] data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
DecodeStructuredModeMessage(new MemoryStream(data), contentType, extensionAttributes);
|
||||
|
||||
private CloudEvent DecodeGenericRecord(GenericRecord record, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
if (!record.TryGetValue("attribute", out var attrObj))
|
||||
if (!record.TryGetValue(AttributeName, out var attrObj))
|
||||
{
|
||||
return null;
|
||||
throw new ArgumentException($"Record has no '{AttributeName}' field");
|
||||
}
|
||||
IDictionary<string, object> recordAttributes = (IDictionary<string, object>)attrObj;
|
||||
|
||||
CloudEventsSpecVersion specVersion = CloudEventsSpecVersion.Default;
|
||||
if (recordAttributes.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var versionId) &&
|
||||
versionId is string versionIdString)
|
||||
if (!recordAttributes.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var versionId) ||
|
||||
!(versionId is string versionIdString))
|
||||
{
|
||||
specVersion = CloudEventsSpecVersion.FromVersionId(versionIdString);
|
||||
throw new ArgumentException("Specification version attribute is missing");
|
||||
}
|
||||
var cloudEvent = new CloudEvent(specVersion, extensionAttributes);
|
||||
CloudEventsSpecVersion version = CloudEventsSpecVersion.FromVersionId(versionIdString);
|
||||
if (version is null)
|
||||
{
|
||||
throw new ArgumentException($"Unsupported CloudEvents spec version '{versionIdString}'");
|
||||
}
|
||||
|
||||
var cloudEvent = new CloudEvent(version, extensionAttributes);
|
||||
cloudEvent.Data = record.TryGetValue(DataName, out var data) ? data : null;
|
||||
|
||||
foreach (var keyValuePair in recordAttributes)
|
||||
|
@ -78,6 +99,7 @@ namespace CloudNative.CloudEvents
|
|||
|
||||
// The Avro schema allows the value to be a Boolean, integer, string or bytes.
|
||||
// Timestamps and URIs are represented as strings, so we just use SetAttributeFromString to handle those.
|
||||
// TODO: This does mean that any extensions of these types must have been registered beforehand.
|
||||
if (value is bool || value is int || value is byte[])
|
||||
{
|
||||
cloudEvent[key] = value;
|
||||
|
@ -95,12 +117,13 @@ namespace CloudNative.CloudEvents
|
|||
return cloudEvent;
|
||||
}
|
||||
|
||||
public override byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType)
|
||||
public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
|
||||
{
|
||||
contentType = new ContentType(CloudEvent.MediaType+AvroEventFormatter.MediaTypeSuffix);
|
||||
contentType = new ContentType(CloudEvent.MediaType + MediaTypeSuffix);
|
||||
|
||||
// We expect the Avro encoded to detect data types that can't be represented in the schema.
|
||||
GenericRecord record = new GenericRecord(avroSchema);
|
||||
record.Add(DataName, SerializeData(cloudEvent.Data));
|
||||
record.Add(DataName, cloudEvent.Data);
|
||||
var recordAttributes = new Dictionary<string, object>();
|
||||
recordAttributes[CloudEventsSpecVersion.SpecVersionAttribute.Name] = cloudEvent.SpecVersion.VersionId;
|
||||
|
||||
|
@ -114,33 +137,18 @@ namespace CloudNative.CloudEvents
|
|||
: attribute.Format(value);
|
||||
recordAttributes[attribute.Name] = avroValue;
|
||||
}
|
||||
record.Add("attribute", recordAttributes);
|
||||
record.Add(AttributeName, recordAttributes);
|
||||
MemoryStream memStream = new MemoryStream();
|
||||
BinaryEncoder encoder = new BinaryEncoder(memStream);
|
||||
avroWriter.Write(record, encoder);
|
||||
return new Span<byte>(memStream.GetBuffer(), 0, (int)memStream.Length).ToArray();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Convert data into a suitable format for inclusion in an Avro record.
|
||||
/// TODO: Asynchronous version of this...
|
||||
/// </summary>
|
||||
private static object SerializeData(object data)
|
||||
{
|
||||
if (data is Stream stream)
|
||||
{
|
||||
var ms = new MemoryStream();
|
||||
stream.CopyTo(ms);
|
||||
return ms.ToArray();
|
||||
}
|
||||
return data;
|
||||
return memStream.ToArray();
|
||||
}
|
||||
|
||||
// TODO: Validate that this is correct...
|
||||
public override byte[] EncodeData(object value) =>
|
||||
public override byte[] EncodeBinaryModeEventData(CloudEvent cloudEvent) =>
|
||||
throw new NotSupportedException("The Avro event formatter does not support binary content mode");
|
||||
|
||||
public override object DecodeData(byte[] value, string contentType) =>
|
||||
public override void DecodeBinaryModeEventData(byte[] value, CloudEvent cloudEvent) =>
|
||||
throw new NotSupportedException("The Avro event formatter does not support binary content mode");
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ using Confluent.Kafka;
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Net.Mime;
|
||||
using System.Text;
|
||||
|
||||
namespace CloudNative.CloudEvents.Kafka
|
||||
|
@ -38,7 +39,7 @@ namespace CloudNative.CloudEvents.Kafka
|
|||
// Structured mode
|
||||
if (contentType?.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase) == true)
|
||||
{
|
||||
cloudEvent = eventFormatter.DecodeStructuredEvent(message.Value, extensionAttributes);
|
||||
cloudEvent = eventFormatter.DecodeStructuredModeMessage(message.Value, new ContentType(contentType), extensionAttributes);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -34,7 +34,7 @@ namespace CloudNative.CloudEvents.Kafka
|
|||
|
||||
if (contentMode == ContentMode.Structured)
|
||||
{
|
||||
Value = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
|
||||
Value = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
|
||||
Headers.Add(KafkaContentTypeAttributeName, Encoding.UTF8.GetBytes(contentType.MediaType));
|
||||
}
|
||||
else
|
||||
|
|
|
@ -11,7 +11,8 @@ namespace CloudNative.CloudEvents.Mqtt
|
|||
public static CloudEvent ToCloudEvent(this MqttApplicationMessage message,
|
||||
CloudEventFormatter eventFormatter, params CloudEventAttribute[] extensionAttributes)
|
||||
{
|
||||
return eventFormatter.DecodeStructuredEvent(message.Payload, extensionAttributes);
|
||||
// TODO: Determine if there's a sensible content type we should apply.
|
||||
return eventFormatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -11,7 +11,7 @@ namespace CloudNative.CloudEvents.Mqtt
|
|||
{
|
||||
public MqttCloudEventMessage(CloudEvent cloudEvent, CloudEventFormatter formatter)
|
||||
{
|
||||
this.Payload = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
|
||||
this.Payload = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,77 +14,133 @@ using System.Threading.Tasks;
|
|||
namespace CloudNative.CloudEvents.NewtonsoftJson
|
||||
{
|
||||
/// <summary>
|
||||
/// Formatter that implements the JSON Event Format
|
||||
/// Formatter that implements the JSON Event Format.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// When encoding CloudEvent data, the behavior depends on the data content type of the CloudEvent
|
||||
/// and the type of the <see cref="CloudEvent.Data"/> property value, following the rules below.
|
||||
/// </para>
|
||||
/// <list type="bullet">
|
||||
/// <item><description>
|
||||
/// If the data value is null, the content is empty for a binary mode message, and neither the "data"
|
||||
/// nor "data_base64" property is populated in a structured mode message.
|
||||
/// </description></item>
|
||||
/// <item><description>
|
||||
/// If the data content type is absent or has a media type of "application/json", the data is encoded as JSON.
|
||||
/// If the data is already a <see cref="JToken"/>, that is serialized directly as JSON. Otherwise, the data
|
||||
/// is converted using the <see cref="JsonSerializer"/> passed into the constructor, or a
|
||||
/// default serializer.
|
||||
/// </description></item>
|
||||
/// <item><description>
|
||||
/// Otherwise, if the data content type has a media type beginning with "text/" and the data value is a string,
|
||||
/// the data is serialized as a string.
|
||||
/// </description></item>
|
||||
/// <item><description>
|
||||
/// Otherwise, if the data value is a byte array, it is serialized either directly as binary data
|
||||
/// (for binary mode messages) or as base64 data (for structured mode messages).
|
||||
/// </description></item>
|
||||
/// <item><description>
|
||||
/// Otherwise, the encoding operation fails.
|
||||
/// </description></item>
|
||||
/// </list>
|
||||
/// <para>
|
||||
/// When decoding CloudEvent data, the following rules are used:
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// In a structured mode message, any data is either binary data within the "data_base64" property value,
|
||||
/// or is a JSON token as the "data" property value. Binary data is represented as a byte array.
|
||||
/// A JSON token is decoded as a string if is just a string value and the data content type is specified
|
||||
/// and has a media type beginning with "text/". A JSON token representing the null value always
|
||||
/// leads to a null data result. In any other situation, the JSON token is preserved as a <see cref="JToken"/>
|
||||
/// that can be used for further deserialization (e.g. to a specific CLR type).
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// In a binary mode message, the data is parsed based on the content type of the message. When the content
|
||||
/// type is absent or has a media type of "application/json", the data is parsed as JSON, with the result as
|
||||
/// a <see cref="JToken"/>. When the content type has a media type beginning with "text/", the data is parsed
|
||||
/// as a string. In all other cases, the data is left as a byte array.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public class JsonEventFormatter : CloudEventFormatter
|
||||
{
|
||||
private const string JsonMediaType = "application/json";
|
||||
private const string DataBase64 = "data_base64";
|
||||
private const string Data = "data";
|
||||
public const string MediaTypeSuffix = "+json";
|
||||
private const string MediaTypeSuffix = "+json";
|
||||
|
||||
public override async Task<CloudEvent> DecodeStructuredEventAsync(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
private readonly JsonSerializer serializer;
|
||||
|
||||
/// <summary>
|
||||
/// Creates a JsonEventFormatter that uses a default <see cref="JsonSerializer"/>.
|
||||
/// </summary>
|
||||
public JsonEventFormatter() : this(JsonSerializer.CreateDefault())
|
||||
{
|
||||
var jsonReader = new JsonTextReader(new StreamReader(data, Encoding.UTF8, true, 8192, true))
|
||||
{
|
||||
DateParseHandling = DateParseHandling.None
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Creates a JsonEventFormatter that uses the specified <see cref="JsonSerializer"/>
|
||||
/// to serialize objects as JSON.
|
||||
/// </summary>
|
||||
public JsonEventFormatter(JsonSerializer serializer)
|
||||
{
|
||||
this.serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
|
||||
}
|
||||
|
||||
public override async Task<CloudEvent> DecodeStructuredModeMessageAsync(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
var jsonReader = CreateJsonReader(data, contentType.GetEncoding());
|
||||
var jObject = await JObject.LoadAsync(jsonReader).ConfigureAwait(false);
|
||||
return DecodeJObject(jObject, extensionAttributes);
|
||||
}
|
||||
|
||||
public override CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes = null)
|
||||
public override CloudEvent DecodeStructuredModeMessage(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
var jsonReader = new JsonTextReader(new StreamReader(data, Encoding.UTF8, true, 8192, true))
|
||||
{
|
||||
DateParseHandling = DateParseHandling.None
|
||||
};
|
||||
var jsonReader = CreateJsonReader(data, contentType.GetEncoding());
|
||||
var jObject = JObject.Load(jsonReader);
|
||||
return DecodeJObject(jObject, extensionAttributes);
|
||||
}
|
||||
|
||||
public override CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable<CloudEventAttribute> extensionAttributes = null) =>
|
||||
DecodeStructuredEvent(new MemoryStream(data), extensionAttributes);
|
||||
public override CloudEvent DecodeStructuredModeMessage(byte[] data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
DecodeStructuredModeMessage(new MemoryStream(data), contentType, extensionAttributes);
|
||||
|
||||
private CloudEvent DecodeJObject(JObject jObject, IEnumerable<CloudEventAttribute> extensionAttributes = null)
|
||||
{
|
||||
CloudEventsSpecVersion specVersion = CloudEventsSpecVersion.Default;
|
||||
if (jObject.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionToken))
|
||||
if (!jObject.TryGetValue(CloudEventsSpecVersion.SpecVersionAttribute.Name, out var specVersionToken)
|
||||
|| specVersionToken.Type != JTokenType.String)
|
||||
{
|
||||
string versionId = (string)specVersionToken;
|
||||
specVersion = CloudEventsSpecVersion.FromVersionId(versionId);
|
||||
// TODO: Throw if specVersion is null?
|
||||
throw new ArgumentException($"Structured mode content does not represent a CloudEvent");
|
||||
}
|
||||
var specVersion = CloudEventsSpecVersion.FromVersionId((string) specVersionToken);
|
||||
if (specVersion is null)
|
||||
{
|
||||
throw new ArgumentException($"Unsupported CloudEvents spec version '{(string)specVersionToken}'");
|
||||
}
|
||||
|
||||
var cloudEvent = new CloudEvent(specVersion, extensionAttributes);
|
||||
PopulateAttributesFromStructuredEvent(cloudEvent, jObject);
|
||||
PopulateDataFromStructuredEvent(cloudEvent, jObject);
|
||||
|
||||
return cloudEvent;
|
||||
}
|
||||
|
||||
private void PopulateAttributesFromStructuredEvent(CloudEvent cloudEvent, JObject jObject)
|
||||
{
|
||||
foreach (var keyValuePair in jObject)
|
||||
{
|
||||
var key = keyValuePair.Key;
|
||||
var value = keyValuePair.Value;
|
||||
|
||||
// Skip the spec version attribute, which we've already taken account of.
|
||||
if (key == CloudEventsSpecVersion.SpecVersionAttribute.Name)
|
||||
// Data is handled later, when everything else (importantly, the data content type)
|
||||
// has been populated.
|
||||
if (key == CloudEventsSpecVersion.SpecVersionAttribute.Name ||
|
||||
key == DataBase64 ||
|
||||
key == Data)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: Is the data_base64 name version-specific?
|
||||
if (specVersion == CloudEventsSpecVersion.V1_0 && key == DataBase64)
|
||||
{
|
||||
// Handle base64 encoded binaries
|
||||
cloudEvent.Data = Convert.FromBase64String((string)value);
|
||||
continue;
|
||||
}
|
||||
if (key == Data)
|
||||
{
|
||||
// FIXME: Deserialize where appropriate.
|
||||
// Consider whether there are any options here to consider beyond "string" and "object".
|
||||
// (e.g. arrays, numbers etc).
|
||||
// Note: the cast to "object" is important here, otherwise the string branch is implicitly
|
||||
// converted back to JToken...
|
||||
cloudEvent.Data = value.Type == JTokenType.String ? (string) value : (object) value;
|
||||
continue;
|
||||
}
|
||||
|
||||
var attribute = cloudEvent.GetAttribute(key);
|
||||
|
||||
// Set the attribute in the event, taking account of mismatches between the type in the JObject
|
||||
|
@ -96,18 +152,53 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
|
|||
string attributeValue = value.Type switch
|
||||
{
|
||||
JTokenType.String => (string)value,
|
||||
JTokenType.Boolean => CloudEventAttributeType.Boolean.Format((bool)value),
|
||||
JTokenType.Null => null, // TODO: Check we want to do this. It's a bit weird.
|
||||
JTokenType.Integer => CloudEventAttributeType.Integer.Format((int)value),
|
||||
_ => throw new ArgumentException($"Invalid token type '{value.Type}' for CloudEvent attribute")
|
||||
};
|
||||
|
||||
// Note: we *could* infer an extension type of integer and Boolean, but not other extension types.
|
||||
// (We don't want to assume that everything that looks like a timestamp is a timestamp, etc.)
|
||||
// Stick to strings for consistency.
|
||||
cloudEvent.SetAttributeFromString(key, attributeValue);
|
||||
}
|
||||
|
||||
return cloudEvent;
|
||||
}
|
||||
|
||||
public override byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType)
|
||||
private void PopulateDataFromStructuredEvent(CloudEvent cloudEvent, JObject jObject)
|
||||
{
|
||||
jObject.TryGetValue(Data, out var dataToken);
|
||||
jObject.TryGetValue(DataBase64, out var dataBase64Token);
|
||||
if (dataToken is null && dataBase64Token is null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (dataToken is object && dataBase64Token is object)
|
||||
{
|
||||
throw new ArgumentException($"Structured mode content cannot contain both '{Data}' and '{DataBase64}' properties.");
|
||||
}
|
||||
if (dataBase64Token is object)
|
||||
{
|
||||
if (dataBase64Token.Type != JTokenType.String)
|
||||
{
|
||||
throw new ArgumentException($"Structured mode property '{DataBase64}' must be a string, when present.");
|
||||
}
|
||||
cloudEvent.Data = Convert.FromBase64String((string) dataBase64Token);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (dataToken.Type == JTokenType.Null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
// Convert JSON string tokens to string values when the content type suggests that's appropriate,
|
||||
// otherwise leave the token as it is.
|
||||
cloudEvent.Data = dataToken.Type == JTokenType.String && cloudEvent.DataContentType?.StartsWith("text/") == true
|
||||
? (string) dataToken
|
||||
: (object) dataToken; // Deliberately cast to object to avoid any implicit conversions
|
||||
}
|
||||
}
|
||||
|
||||
public override byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType)
|
||||
{
|
||||
contentType = new ContentType("application/cloudevents+json")
|
||||
{
|
||||
|
@ -115,21 +206,23 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
|
|||
};
|
||||
|
||||
JObject jObject = new JObject();
|
||||
jObject[CloudEventsSpecVersion.SpecVersionAttribute.Name] = cloudEvent.SpecVersion.VersionId;
|
||||
var attributes = cloudEvent.GetPopulatedAttributes();
|
||||
foreach (var keyValuePair in attributes)
|
||||
{
|
||||
jObject[keyValuePair.Key.Name] = JToken.FromObject(keyValuePair.Value);
|
||||
}
|
||||
|
||||
// FIXME: This is all a bit arbitrary.
|
||||
if (cloudEvent.Data is object)
|
||||
{
|
||||
// FIXME: This assumes there's nothing beyond the media type...
|
||||
if (cloudEvent.DataContentType == "application/json")
|
||||
ContentType dataContentType = new ContentType(cloudEvent.DataContentType ?? JsonMediaType);
|
||||
if (dataContentType.MediaType == JsonMediaType)
|
||||
{
|
||||
jObject[Data] = JToken.FromObject(cloudEvent.Data);
|
||||
jObject[Data] = cloudEvent.Data is JToken token
|
||||
? token
|
||||
: JToken.FromObject(cloudEvent.Data);
|
||||
}
|
||||
else if (cloudEvent.Data is string text && cloudEvent.DataContentType?.StartsWith("text/") == true)
|
||||
else if (cloudEvent.Data is string text && dataContentType.MediaType.StartsWith("text/"))
|
||||
{
|
||||
jObject[Data] = text;
|
||||
}
|
||||
|
@ -139,7 +232,7 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
|
|||
}
|
||||
else
|
||||
{
|
||||
throw new ArgumentException($"{nameof(JsonEventFormatter)} cannot serialize data of type {cloudEvent.Data.GetType()} with content type {cloudEvent.DataContentType}");
|
||||
throw new ArgumentException($"{nameof(JsonEventFormatter)} cannot serialize data of type {cloudEvent.Data.GetType()} with content type '{cloudEvent.DataContentType}'");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,39 +240,54 @@ namespace CloudNative.CloudEvents.NewtonsoftJson
|
|||
}
|
||||
|
||||
// TODO: How should the caller know whether the result is "raw" or should be stored in data_base64?
|
||||
public override byte[] EncodeData(object value)
|
||||
public override byte[] EncodeBinaryModeEventData(CloudEvent cloudEvent)
|
||||
{
|
||||
// TODO: Check this is what we want.
|
||||
// In particular, if this is just other text or binary data, rather than JSON, what does it
|
||||
// mean to have a JSON event format?
|
||||
string json = value switch
|
||||
if (cloudEvent.Data is null)
|
||||
{
|
||||
JToken token => token.ToString(), // Formatting?
|
||||
string text => text,
|
||||
byte[] data => Convert.ToBase64String(data),
|
||||
null => null,
|
||||
_ => JsonConvert.SerializeObject(value)
|
||||
};
|
||||
return json is null ? new byte[0] : Encoding.UTF8.GetBytes(json);
|
||||
return Array.Empty<byte>();
|
||||
}
|
||||
ContentType contentType = new ContentType(cloudEvent.DataContentType ?? JsonMediaType);
|
||||
if (contentType.MediaType == JsonMediaType)
|
||||
{
|
||||
string json = JsonConvert.SerializeObject(cloudEvent.Data);
|
||||
return contentType.GetEncoding().GetBytes(json);
|
||||
}
|
||||
if (contentType.MediaType.StartsWith("text/") && cloudEvent.Data is string text)
|
||||
{
|
||||
return contentType.GetEncoding().GetBytes(text);
|
||||
}
|
||||
if (cloudEvent.Data is byte[] bytes)
|
||||
{
|
||||
return bytes;
|
||||
}
|
||||
throw new ArgumentException($"{nameof(JsonEventFormatter)} cannot serialize data of type {cloudEvent.Data.GetType()} with content type '{cloudEvent.DataContentType}'");
|
||||
}
|
||||
|
||||
public override object DecodeData(byte[] value, string contentType)
|
||||
public override void DecodeBinaryModeEventData(byte[] value, CloudEvent cloudEvent)
|
||||
{
|
||||
if (contentType == "application/json")
|
||||
ContentType contentType = new ContentType(cloudEvent.DataContentType ?? JsonMediaType);
|
||||
|
||||
Encoding encoding = contentType.GetEncoding();
|
||||
|
||||
if (contentType.MediaType == JsonMediaType)
|
||||
{
|
||||
var jsonReader = new JsonTextReader(new StreamReader(new MemoryStream(value), Encoding.UTF8, true, 8192, true))
|
||||
{
|
||||
DateParseHandling = DateParseHandling.DateTimeOffset
|
||||
};
|
||||
return JToken.Load(jsonReader);
|
||||
var jsonReader = CreateJsonReader(new MemoryStream(value), encoding);
|
||||
cloudEvent.Data = JToken.Load(jsonReader);
|
||||
}
|
||||
else if (contentType?.StartsWith("text/") == true)
|
||||
else if (contentType.MediaType.StartsWith("text/") == true)
|
||||
{
|
||||
// FIXME: Even if we want to do this, we really need to know if there's a content encoding.
|
||||
return Encoding.UTF8.GetString(value);
|
||||
cloudEvent.Data = encoding.GetString(value);
|
||||
}
|
||||
else
|
||||
{
|
||||
cloudEvent.Data = value;
|
||||
}
|
||||
// TODO: Clone?
|
||||
return value;
|
||||
}
|
||||
|
||||
private static JsonReader CreateJsonReader(Stream stream, Encoding encoding) =>
|
||||
new JsonTextReader(new StreamReader(stream, encoding ?? Encoding.UTF8, detectEncodingFromByteOrderMarks: true, bufferSize: 8192, leaveOpen: true))
|
||||
{
|
||||
DateParseHandling = DateParseHandling.None
|
||||
};
|
||||
}
|
||||
}
|
|
@ -11,61 +11,107 @@ using System.Threading.Tasks;
|
|||
namespace CloudNative.CloudEvents
|
||||
{
|
||||
/// <summary>
|
||||
/// Implemented by formatters
|
||||
/// Performs CloudEvent conversions as part of encoding and decoding messages for protocol bindings.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Event formatters are responsible for complete CloudEvent encoding and decoding for structured-mode messages (where
|
||||
/// all the CloudEvent information is represented within the message body), and data-only encoding and decoding
|
||||
/// for binary-mode messages (where CloudEvent attributes are represented in message metadata, and the CloudEvent data
|
||||
/// is represented in the message body).
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Each event formatter type is responsible for documenting what types of value are acceptable for the <see cref="CloudEvent.Data"/>
|
||||
/// property in CloudEvents it is asked to encode, and likewise what types of value will be present in the same property
|
||||
/// when it is asked to decode a message. Event formatters should aim to be as consistent as possible with respect to data handling
|
||||
/// between structured and binary modes, although this is not always possible as the structured mode representation may contain
|
||||
/// more hints around how to interpret the data than the binary mode representation. Inconsistencies should be carefully
|
||||
/// noted so that consumers can write robust code.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// An event format is often naturally associated with a particular kind of data, but it is not limited to working with
|
||||
/// that kind. For example, the JSON event format allows JSON data to be stored particularly naturally within the structured-mode
|
||||
/// message body (which is itself JSON), but it is still able to handle arbitrary binary or text data.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public abstract class CloudEventFormatter
|
||||
{
|
||||
/// <summary>
|
||||
/// Decode a structured event from a stream. The default implementation copies the
|
||||
/// content of the stream into a byte array before passing it to <see cref="DecodeStructuredEvent(byte[], IEnumerable{CloudEventAttribute})"/>,
|
||||
/// Asynchronously decodes a CloudEvent from a structured-mode message body, represented as a stream. The default implementation copies the
|
||||
/// content of the stream into a byte array before passing it to <see cref="DecodeStructuredModeMessage(byte[], ContentType, IEnumerable{CloudEventAttribute})"/>
|
||||
/// but this can be overridden by event formatters that can decode a stream more efficiently.
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
/// <param name="extensions"></param>
|
||||
/// <returns></returns>
|
||||
public virtual CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
/// <param name="data">The data within the message body. Must not be null.</param>
|
||||
/// <param name="contentType">The content type of the message, or null if no content type is known.
|
||||
/// Typically this is a content type with a media type of "application/cloudevents"; the additional
|
||||
/// information such as the charset parameter may be needed in order to decode the data.</param>
|
||||
/// <param name="extensions">The extension attributes to use when populating the CloudEvent. May be null.</param>
|
||||
/// <returns>The decoded CloudEvent.</returns>
|
||||
public virtual CloudEvent DecodeStructuredModeMessage(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
var bytes = BinaryDataUtilities.ToByteArray(data);
|
||||
return DecodeStructuredEvent(bytes, extensionAttributes);
|
||||
return DecodeStructuredModeMessage(bytes, contentType, extensionAttributes);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decode a structured event from a stream. The default implementation asynchronously copies the
|
||||
/// content of the stream into a byte array before passing it to <see cref="DecodeStructuredEvent(byte[], IEnumerable{CloudEventAttribute})"/>,
|
||||
/// Decodes a CloudEvent from a structured-mode message body, represented as a stream. The default implementation asynchronously copies the
|
||||
/// content of the stream into a byte array before passing it to <see cref="DecodeStructuredModeMessage(byte[], ContentType, IEnumerable{CloudEventAttribute})"/>
|
||||
/// but this can be overridden by event formatters that can decode a stream more efficiently.
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
/// <param name="extensions"></param>
|
||||
/// <returns></returns>
|
||||
public virtual async Task<CloudEvent> DecodeStructuredEventAsync(Stream data, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
/// <param name="data">The data within the message body. Must not be null.</param>
|
||||
/// <param name="contentType">The content type of the message, or null if no content type is known.
|
||||
/// Typically this is a content type with a media type of "application/cloudevents"; the additional
|
||||
/// information such as the charset parameter may be needed in order to decode the data.</param>
|
||||
/// <param name="extensions">The extension attributes to use when populating the CloudEvent. May be null.</param>
|
||||
/// <returns>The CloudEvent derived from the structured data.</returns>
|
||||
public virtual async Task<CloudEvent> DecodeStructuredModeMessageAsync(Stream data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
var bytes = await BinaryDataUtilities.ToByteArrayAsync(data).ConfigureAwait(false);
|
||||
return DecodeStructuredEvent(bytes, extensionAttributes);
|
||||
return DecodeStructuredModeMessage(bytes, contentType, extensionAttributes);
|
||||
}
|
||||
|
||||
// TODO: Remove either this one or the stream one? It seems unnecessary to have both.
|
||||
|
||||
/// <summary>
|
||||
/// Decode a structured event from a byte array
|
||||
/// Decodes a CloudEvent from a structured-mode message body, represented as a byte array.
|
||||
/// </summary>
|
||||
/// <param name="data"></param>
|
||||
/// <param name="extensions"></param>
|
||||
/// <returns></returns>
|
||||
public virtual CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
/// <param name="data">The data within the message body. Must not be null.</param>
|
||||
/// <param name="contentType">The content type of the message, or null if no content type is known.
|
||||
/// Typically this is a content type with a media type of "application/cloudevents"; the additional
|
||||
/// information such as the charset parameter may be needed in order to decode the data.</param>
|
||||
/// <param name="extensions">The extension attributes to use when populating the CloudEvent. May be null.</param>
|
||||
/// <returns>The CloudEvent derived from the structured data.</returns>
|
||||
public virtual CloudEvent DecodeStructuredModeMessage(byte[] data, ContentType contentType, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
throw new NotImplementedException();
|
||||
|
||||
/// <summary>
|
||||
/// Encode an structured event into a byte array
|
||||
/// Encodes a CloudEvent as the body of a structured-mode message.
|
||||
/// </summary>
|
||||
/// <param name="cloudEvent"></param>
|
||||
/// <param name="contentType"></param>
|
||||
/// <returns></returns>
|
||||
public virtual byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType) =>
|
||||
/// <param name="cloudEvent">The CloudEvent to encode. Must not be null.</param>
|
||||
/// <param name="contentType">On successful return, the content type of the structured-mode data.
|
||||
/// Must not be null (on return).</param>
|
||||
/// <returns>The structured-mode representation of the CloudEvent.</returns>
|
||||
public virtual byte[] EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType) =>
|
||||
throw new NotImplementedException();
|
||||
|
||||
// TODO: Work out whether this is what we want, and whether to potentially
|
||||
// separate it into a separate interface.
|
||||
public virtual byte[] EncodeData(object value) => throw new NotImplementedException();
|
||||
public virtual object DecodeData(byte[] value, string contentType) => throw new NotImplementedException();
|
||||
/// <summary>
|
||||
/// Encodes the data from <paramref name="cloudEvent"/> in a manner suitable for a binary mode message.
|
||||
/// </summary>
|
||||
/// <exception cref="ArgumentException">The data in the given CloudEvent cannot be encoded by this
|
||||
/// event formatter.</exception>
|
||||
/// <returns>The binary-mode representation of the CloudEvent.</returns>
|
||||
public virtual byte[] EncodeBinaryModeEventData(CloudEvent cloudEvent) =>
|
||||
throw new NotImplementedException();
|
||||
|
||||
/// <summary>
|
||||
/// Decodes the given data obtained from a binary-mode message, populating the <see cref="CloudEvent.Data"/>
|
||||
/// property of <paramref name="cloudEvent"/>. Other attributes within the CloudEvent may be used to inform
|
||||
/// the interpretation of the data. This method is expected to be called after all other aspects of the CloudEvent
|
||||
/// have been populated.
|
||||
/// </summary>
|
||||
/// <param name="data">The data from the message. Must not be null, but may be empty.</param>
|
||||
/// <param name="cloudEvent">The CloudEvent whose Data property should be populated. Must not be null.</param>
|
||||
/// <exception cref="ArgumentException">The data in the given CloudEvent cannot be decoded by this
|
||||
/// event formatter.</exception>
|
||||
public virtual void DecodeBinaryModeEventData(byte[] data, CloudEvent cloudEvent) =>
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
}
|
|
@ -6,19 +6,20 @@ using System;
|
|||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Net.Mime;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CloudNative.CloudEvents.Http
|
||||
{
|
||||
// TODO: Do we really need to have a subclass here? How about a static factory method instead?
|
||||
|
||||
/// <summary>
|
||||
/// This class is for use with `HttpClient` and constructs content and headers for
|
||||
/// a HTTP request from a CloudEvent.
|
||||
/// </summary>
|
||||
public class CloudEventHttpContent : HttpContent
|
||||
{
|
||||
IInnerContent inner;
|
||||
private readonly InnerByteArrayContent inner;
|
||||
|
||||
/// <summary>
|
||||
/// Constructor
|
||||
|
@ -28,70 +29,44 @@ namespace CloudNative.CloudEvents.Http
|
|||
/// <param name="formatter">Event formatter</param>
|
||||
public CloudEventHttpContent(CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
|
||||
{
|
||||
if (contentMode == ContentMode.Structured)
|
||||
byte[] content;
|
||||
ContentType contentType;
|
||||
switch (contentMode)
|
||||
{
|
||||
inner = new InnerByteArrayContent(formatter.EncodeStructuredEvent(cloudEvent, out var contentType));
|
||||
// This is optional in the specification, but can be useful.
|
||||
MapHeaders(cloudEvent, includeDataContentType: true);
|
||||
Headers.ContentType = new MediaTypeHeaderValue(contentType.MediaType);
|
||||
return;
|
||||
}
|
||||
case ContentMode.Structured:
|
||||
content = formatter.EncodeStructuredModeMessage(cloudEvent, out contentType);
|
||||
// This is optional in the specification, but can be useful.
|
||||
MapHeaders(cloudEvent, includeDataContentType: true);
|
||||
break;
|
||||
case ContentMode.Binary:
|
||||
content = formatter.EncodeBinaryModeEventData(cloudEvent);
|
||||
contentType = MimeUtilities.CreateContentTypeOrNull(cloudEvent.DataContentType);
|
||||
MapHeaders(cloudEvent, includeDataContentType: false);
|
||||
break;
|
||||
default:
|
||||
throw new ArgumentException($"Unsupported content mode: {contentMode}");
|
||||
|
||||
// TODO: Shouldn't we use the formatter in all cases? If I have a JSON formatter and
|
||||
// If we specify that the the data is a byte array, I'd expect to end up with a base64-encoded representation...
|
||||
if (cloudEvent.Data is byte[])
|
||||
{
|
||||
inner = new InnerByteArrayContent((byte[])cloudEvent.Data);
|
||||
}
|
||||
else if (cloudEvent.Data is string)
|
||||
inner = new InnerByteArrayContent(content);
|
||||
if (contentType is object)
|
||||
{
|
||||
inner = new InnerStringContent((string)cloudEvent.Data);
|
||||
Headers.ContentType = contentType.ToMediaTypeHeaderValue();
|
||||
}
|
||||
else if (cloudEvent.Data is Stream)
|
||||
{
|
||||
inner = new InnerStreamContent((Stream)cloudEvent.Data);
|
||||
}
|
||||
else
|
||||
{
|
||||
inner = new InnerByteArrayContent(formatter.EncodeData(cloudEvent.Data));
|
||||
}
|
||||
|
||||
// We don't require a data content type if there isn't any data.
|
||||
// We may not be able to tell whether the data is empty or not, but we're lenient
|
||||
// in that case.
|
||||
var dataContentType = cloudEvent.DataContentType;
|
||||
if (dataContentType is object)
|
||||
{
|
||||
var mediaType = new ContentType(dataContentType).MediaType;
|
||||
Headers.ContentType = new MediaTypeHeaderValue(mediaType);
|
||||
}
|
||||
else if (TryComputeLength(out var length) && length != 0L)
|
||||
else if (content.Length != 0)
|
||||
{
|
||||
throw new ArgumentException(Strings.ErrorContentTypeUnspecified, nameof(cloudEvent));
|
||||
}
|
||||
MapHeaders(cloudEvent, includeDataContentType: false);
|
||||
}
|
||||
|
||||
private interface IInnerContent
|
||||
{
|
||||
Task InnerSerializeToStreamAsync(Stream stream, TransportContext context);
|
||||
bool InnerTryComputeLength(out long length);
|
||||
}
|
||||
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) =>
|
||||
inner.InnerSerializeToStreamAsync(stream, context);
|
||||
|
||||
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
|
||||
{
|
||||
return inner.InnerSerializeToStreamAsync(stream, context);
|
||||
}
|
||||
|
||||
protected override bool TryComputeLength(out long length)
|
||||
{
|
||||
return inner.InnerTryComputeLength(out length);
|
||||
}
|
||||
protected override bool TryComputeLength(out long length) =>
|
||||
inner.InnerTryComputeLength(out length);
|
||||
|
||||
private void MapHeaders(CloudEvent cloudEvent, bool includeDataContentType)
|
||||
{
|
||||
Headers.Add(HttpUtilities.HttpHeaderPrefix + CloudEventsSpecVersion.SpecVersionAttribute.Name,
|
||||
HttpUtilities.EncodeHeaderValue(cloudEvent.SpecVersion.VersionId));
|
||||
Headers.Add(HttpUtilities.SpecVersionHttpHeader, HttpUtilities.EncodeHeaderValue(cloudEvent.SpecVersion.VersionId));
|
||||
foreach (var attributeAndValue in cloudEvent.GetPopulatedAttributes())
|
||||
{
|
||||
CloudEventAttribute attribute = attributeAndValue.Key;
|
||||
|
@ -115,7 +90,7 @@ namespace CloudNative.CloudEvents.Http
|
|||
/// This inner class is required to get around the 'protected'-ness of the
|
||||
/// override functions of HttpContent for enabling containment/delegation
|
||||
/// </summary>
|
||||
class InnerByteArrayContent : ByteArrayContent, IInnerContent
|
||||
class InnerByteArrayContent : ByteArrayContent
|
||||
{
|
||||
public InnerByteArrayContent(byte[] content) : base(content)
|
||||
{
|
||||
|
@ -131,47 +106,5 @@ namespace CloudNative.CloudEvents.Http
|
|||
return base.TryComputeLength(out length);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This inner class is required to get around the 'protected'-ness of the
|
||||
/// override functions of HttpContent for enabling containment/delegation
|
||||
/// </summary>
|
||||
class InnerStreamContent : StreamContent, IInnerContent
|
||||
{
|
||||
public InnerStreamContent(Stream content) : base(content)
|
||||
{
|
||||
}
|
||||
|
||||
public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context)
|
||||
{
|
||||
return base.SerializeToStreamAsync(stream, context);
|
||||
}
|
||||
|
||||
public bool InnerTryComputeLength(out long length)
|
||||
{
|
||||
return base.TryComputeLength(out length);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// This inner class is required to get around the 'protected'-ness of the
|
||||
/// override functions of HttpContent for enabling containment/delegation
|
||||
/// </summary>
|
||||
class InnerStringContent : StringContent, IInnerContent
|
||||
{
|
||||
public InnerStringContent(string content) : base(content)
|
||||
{
|
||||
}
|
||||
|
||||
public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context)
|
||||
{
|
||||
return base.SerializeToStreamAsync(stream, context);
|
||||
}
|
||||
|
||||
public bool InnerTryComputeLength(out long length)
|
||||
{
|
||||
return base.TryComputeLength(out length);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -8,6 +8,7 @@ using System.Linq;
|
|||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Net.Mime;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CloudNative.CloudEvents.Http
|
||||
|
@ -102,7 +103,7 @@ namespace CloudNative.CloudEvents.Http
|
|||
{
|
||||
// FIXME: Handle no formatter being specified.
|
||||
var stream = await content.ReadAsStreamAsync().ConfigureAwait(false);
|
||||
return await formatter.DecodeStructuredEventAsync(stream, extensionAttributes).ConfigureAwait(false);
|
||||
return await formatter.DecodeStructuredModeMessageAsync(stream, content.Headers.ContentType.ToContentType(), extensionAttributes).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -136,7 +137,7 @@ namespace CloudNative.CloudEvents.Http
|
|||
// TODO: Should this just be the media type? We probably need to take a full audit of this...
|
||||
cloudEvent.DataContentType = content.Headers?.ContentType?.ToString();
|
||||
var data = await content.ReadAsByteArrayAsync().ConfigureAwait(false);
|
||||
cloudEvent.Data = formatter.DecodeData(data, cloudEvent.DataContentType);
|
||||
formatter.DecodeBinaryModeEventData(data, cloudEvent);
|
||||
}
|
||||
return cloudEvent;
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
using System;
|
||||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Mime;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace CloudNative.CloudEvents.Http
|
||||
|
@ -30,17 +31,17 @@ namespace CloudNative.CloudEvents.Http
|
|||
{
|
||||
if (contentMode == ContentMode.Structured)
|
||||
{
|
||||
var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
|
||||
var buffer = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
|
||||
httpListenerResponse.ContentType = contentType.ToString();
|
||||
MapAttributesToListenerResponse(cloudEvent, httpListenerResponse);
|
||||
return httpListenerResponse.OutputStream.WriteAsync(buffer, 0, buffer.Length);
|
||||
}
|
||||
|
||||
Stream stream = HttpUtilities.MapDataAttributeToStream(cloudEvent, formatter);
|
||||
|
||||
// TODO: Check the defaulting to JSON here...
|
||||
httpListenerResponse.ContentType = cloudEvent.DataContentType?.ToString() ?? "application/json";
|
||||
MapAttributesToListenerResponse(cloudEvent, httpListenerResponse);
|
||||
return stream.CopyToAsync(httpListenerResponse.OutputStream);
|
||||
byte[] content = formatter.EncodeBinaryModeEventData(cloudEvent);
|
||||
return httpListenerResponse.OutputStream.WriteAsync(content, 0, content.Length);
|
||||
}
|
||||
|
||||
// TODO: Do we want this? It's not about CloudEvents...
|
||||
|
@ -97,14 +98,16 @@ namespace CloudNative.CloudEvents.Http
|
|||
/// <param name="httpListenerRequest">Listener request</param>
|
||||
/// <param name="formatter"></param>
|
||||
/// <param name="extensions">List of extension instances</param>
|
||||
/// <returns>A CloudEvent instance or 'null' if the request message doesn't hold a CloudEvent</returns>
|
||||
/// <returns>The CloudEvent corresponding to the given request.</returns>
|
||||
/// <exception cref="ArgumentException">The request does not represent a CloudEvent,
|
||||
/// or the event's specification version is not supported,
|
||||
/// or the event formatter cannot interpret it.</exception>
|
||||
public static CloudEvent ToCloudEvent(this HttpListenerRequest httpListenerRequest,
|
||||
CloudEventFormatter formatter, params CloudEventAttribute[] extensionAttributes)
|
||||
{
|
||||
if (HasCloudEventsContentType(httpListenerRequest))
|
||||
{
|
||||
// FIXME: Handle no formatter being specified.
|
||||
return formatter.DecodeStructuredEvent(httpListenerRequest.InputStream, extensionAttributes);
|
||||
return formatter.DecodeStructuredModeMessage(httpListenerRequest.InputStream, MimeUtilities.CreateContentTypeOrNull(httpListenerRequest.ContentType), extensionAttributes);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -132,16 +135,11 @@ namespace CloudNative.CloudEvents.Http
|
|||
cloudEvent.SetAttributeFromString(attributeName, attributeValue);
|
||||
}
|
||||
|
||||
// TODO: Check that this doesn't come through as a header already
|
||||
// The data content type should not have been set via a "ce-" header; instead,
|
||||
// it's in the regular content type.
|
||||
cloudEvent.DataContentType = httpListenerRequest.ContentType;
|
||||
|
||||
// TODO: This is a bit ugly.
|
||||
var memoryStream = new MemoryStream();
|
||||
httpListenerRequest.InputStream.CopyTo(memoryStream);
|
||||
if (memoryStream.Length != 0)
|
||||
{
|
||||
cloudEvent.Data = formatter.DecodeData(memoryStream.ToArray(), cloudEvent.DataContentType);
|
||||
}
|
||||
formatter.DecodeBinaryModeEventData(BinaryDataUtilities.ToByteArray(httpListenerRequest.InputStream), cloudEvent);
|
||||
return cloudEvent;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ using System;
|
|||
using System.IO;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Net.Mime;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
|
@ -21,15 +23,6 @@ namespace CloudNative.CloudEvents.Http
|
|||
|
||||
public const string SpecVersionHttpHeader = HttpHeaderPrefix + "specversion";
|
||||
|
||||
internal static Stream MapDataAttributeToStream(CloudEvent cloudEvent, CloudEventFormatter formatter) =>
|
||||
cloudEvent.Data switch
|
||||
{
|
||||
byte[] bytes => new MemoryStream(bytes),
|
||||
string text => new MemoryStream(Encoding.UTF8.GetBytes(text)),
|
||||
Stream stream => stream,
|
||||
object other => new MemoryStream(formatter.EncodeData(other))
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Checks whether the given HTTP header name starts with "ce-", and if so, converts it into
|
||||
/// a lower-case attribute name.
|
||||
|
|
|
@ -29,16 +29,16 @@ namespace CloudNative.CloudEvents.Http
|
|||
{
|
||||
if (contentMode == ContentMode.Structured)
|
||||
{
|
||||
var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
|
||||
var buffer = formatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
|
||||
httpWebRequest.ContentType = contentType.ToString();
|
||||
await httpWebRequest.GetRequestStream().WriteAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
|
||||
return;
|
||||
}
|
||||
|
||||
Stream stream = HttpUtilities.MapDataAttributeToStream(cloudEvent, formatter);
|
||||
httpWebRequest.ContentType = cloudEvent.DataContentType?.ToString() ?? "application/json";
|
||||
MapAttributesToWebRequest(cloudEvent, httpWebRequest);
|
||||
await stream.CopyToAsync(httpWebRequest.GetRequestStream());
|
||||
byte[] content = formatter.EncodeBinaryModeEventData(cloudEvent);
|
||||
await httpWebRequest.GetRequestStream().WriteAsync(content, 0, content.Length).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
static void MapAttributesToWebRequest(CloudEvent cloudEvent, HttpWebRequest httpWebRequest)
|
||||
|
|
|
@ -5,8 +5,8 @@
|
|||
using CloudNative.CloudEvents.NewtonsoftJson;
|
||||
using System;
|
||||
using System.Net.Mime;
|
||||
using System.Text;
|
||||
using Xunit;
|
||||
using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
|
||||
using static CloudNative.CloudEvents.UnitTests.TestHelpers;
|
||||
|
||||
namespace CloudNative.CloudEvents.Avro.UnitTests
|
||||
|
@ -30,9 +30,9 @@ namespace CloudNative.CloudEvents.Avro.UnitTests
|
|||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var avroFormatter = new AvroEventFormatter();
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
|
||||
var avroData = avroFormatter.EncodeStructuredEvent(cloudEvent, out var contentType);
|
||||
var cloudEvent2 = avroFormatter.DecodeStructuredEvent(avroData, new CloudEventAttribute[0]);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeText(jsonv10);
|
||||
var avroData = avroFormatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
|
||||
var cloudEvent2 = avroFormatter.DecodeStructuredModeMessage(avroData, contentType, extensionAttributes: null);
|
||||
|
||||
Assert.Equal(cloudEvent2.SpecVersion, cloudEvent.SpecVersion);
|
||||
Assert.Equal(cloudEvent2.Type, cloudEvent.Type);
|
||||
|
@ -48,9 +48,9 @@ namespace CloudNative.CloudEvents.Avro.UnitTests
|
|||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var avroFormatter = new AvroEventFormatter();
|
||||
var cloudEventJ = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
|
||||
var avroData = avroFormatter.EncodeStructuredEvent(cloudEventJ, out var contentType);
|
||||
var cloudEvent = avroFormatter.DecodeStructuredEvent(avroData, new CloudEventAttribute[0]);
|
||||
var cloudEventJ = jsonFormatter.DecodeStructuredModeText(jsonv10);
|
||||
var avroData = avroFormatter.EncodeStructuredModeMessage(cloudEventJ, out var contentType);
|
||||
var cloudEvent = avroFormatter.DecodeStructuredModeMessage(avroData, contentType, extensionAttributes: null);
|
||||
|
||||
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
|
||||
Assert.Equal("com.github.pull.create", cloudEvent.Type);
|
||||
|
@ -69,9 +69,9 @@ namespace CloudNative.CloudEvents.Avro.UnitTests
|
|||
var jsonFormatter = new JsonEventFormatter();
|
||||
var avroFormatter = new AvroEventFormatter();
|
||||
var extensionAttribute = CloudEventAttribute.CreateExtension("comexampleextension1", CloudEventAttributeType.String);
|
||||
var cloudEventJ = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10), new[] { extensionAttribute });
|
||||
var avroData = avroFormatter.EncodeStructuredEvent(cloudEventJ, out var contentType);
|
||||
var cloudEvent = avroFormatter.DecodeStructuredEvent(avroData, new[] { extensionAttribute });
|
||||
var cloudEventJ = jsonFormatter.DecodeStructuredModeText(jsonv10, new[] { extensionAttribute });
|
||||
var avroData = avroFormatter.EncodeStructuredModeMessage(cloudEventJ, out var contentType);
|
||||
var cloudEvent = avroFormatter.DecodeStructuredModeMessage(avroData, contentType, new[] { extensionAttribute });
|
||||
|
||||
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
|
||||
Assert.Equal("com.github.pull.create", cloudEvent.Type);
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
// Copyright 2021 Cloud Native Foundation.
|
||||
// Licensed under the Apache 2.0 license.
|
||||
// See LICENSE file in the project root for full license information.
|
||||
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace CloudNative.CloudEvents.UnitTests
|
||||
{
|
||||
/// <summary>
|
||||
/// Extension methods for CloudEventFormatters to simplify testing.
|
||||
/// Often in tests we have structured mode data as strings, and usually the content type isn't important,
|
||||
/// so it's useful to be able to just decode that string directly.
|
||||
/// </summary>
|
||||
internal static class CloudEventFormatterExtensions
|
||||
{
|
||||
internal static CloudEvent DecodeStructuredModeText(this CloudEventFormatter eventFormatter, string text) =>
|
||||
eventFormatter.DecodeStructuredModeMessage(Encoding.UTF8.GetBytes(text), contentType: null, extensionAttributes: null);
|
||||
|
||||
internal static CloudEvent DecodeStructuredModeText(this CloudEventFormatter eventFormatter, string text, IEnumerable<CloudEventAttribute> extensionAttributes) =>
|
||||
eventFormatter.DecodeStructuredModeMessage(Encoding.UTF8.GetBytes(text), contentType: null, extensionAttributes);
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
using CloudNative.CloudEvents.NewtonsoftJson;
|
||||
using System.Text;
|
||||
using Xunit;
|
||||
using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
|
||||
|
||||
namespace CloudNative.CloudEvents.Extensions.UnitTests
|
||||
{
|
||||
|
@ -27,7 +28,7 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
|
|||
public void ParseJson()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson), DistributedTracing.AllAttributes);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeText(sampleJson, DistributedTracing.AllAttributes);
|
||||
|
||||
Assert.Equal(SampleParent, cloudEvent[DistributedTracing.TraceParentAttribute]);
|
||||
Assert.Equal(SampleParent, cloudEvent.GetTraceParent());
|
||||
|
@ -39,9 +40,9 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
|
|||
public void Transcode()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
|
||||
var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out _);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData, DistributedTracing.AllAttributes);
|
||||
var cloudEvent1 = jsonFormatter.DecodeStructuredModeText(sampleJson);
|
||||
var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent1, out var contentType);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType, DistributedTracing.AllAttributes);
|
||||
|
||||
Assert.Equal(SampleParent, cloudEvent[DistributedTracing.TraceParentAttribute]);
|
||||
Assert.Equal(SampleParent, cloudEvent.GetTraceParent());
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
using CloudNative.CloudEvents.NewtonsoftJson;
|
||||
using System.Text;
|
||||
using Xunit;
|
||||
using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
|
||||
|
||||
namespace CloudNative.CloudEvents.Extensions.UnitTests
|
||||
{
|
||||
|
@ -24,7 +25,7 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
|
|||
public void ParseJson()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeText(sampleJson);
|
||||
Assert.Equal("abc", cloudEvent["partitionkey"]);
|
||||
Assert.Equal("abc", cloudEvent[Partitioning.PartitionKeyAttribute]);
|
||||
Assert.Equal("abc", cloudEvent.GetPartitionKey());
|
||||
|
@ -34,9 +35,9 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
|
|||
public void Transcode()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
|
||||
var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out _);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData);
|
||||
var cloudEvent1 = jsonFormatter.DecodeStructuredModeText(sampleJson);
|
||||
var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent1, out var contentType);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType, null);
|
||||
|
||||
Assert.Equal("abc", cloudEvent["partitionkey"]);
|
||||
Assert.Equal("abc", cloudEvent[Partitioning.PartitionKeyAttribute]);
|
||||
|
|
|
@ -6,6 +6,7 @@ using CloudNative.CloudEvents.NewtonsoftJson;
|
|||
using System;
|
||||
using System.Text;
|
||||
using Xunit;
|
||||
using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
|
||||
|
||||
namespace CloudNative.CloudEvents.Extensions.UnitTests
|
||||
{
|
||||
|
@ -24,7 +25,7 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
|
|||
public void SamplingParse()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson), Sampling.AllAttributes);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeText(sampleJson, Sampling.AllAttributes);
|
||||
|
||||
Assert.Equal(1, cloudEvent["sampledrate"]);
|
||||
Assert.Equal(1, cloudEvent.GetSampledRate());
|
||||
|
@ -34,12 +35,12 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
|
|||
public void SamplingJsonTranscode()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
|
||||
var cloudEvent1 = jsonFormatter.DecodeStructuredModeText(sampleJson);
|
||||
// Note that the value is just a string here, as we don't know the attribute type.
|
||||
Assert.Equal("1", cloudEvent1["sampledrate"]);
|
||||
|
||||
var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out _);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData, Sampling.AllAttributes);
|
||||
var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent1, out var contentType);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType, Sampling.AllAttributes);
|
||||
|
||||
// When parsing with the attributes in place, the value is propagated as an integer.
|
||||
Assert.Equal(1, cloudEvent["sampledrate"]);
|
||||
|
|
|
@ -6,6 +6,7 @@ using CloudNative.CloudEvents.NewtonsoftJson;
|
|||
using System;
|
||||
using System.Text;
|
||||
using Xunit;
|
||||
using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
|
||||
|
||||
namespace CloudNative.CloudEvents.Extensions.UnitTests
|
||||
{
|
||||
|
@ -25,7 +26,7 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
|
|||
public void Parse()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson), Sequence.AllAttributes);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeText(sampleJson, Sequence.AllAttributes);
|
||||
|
||||
Assert.Equal("Integer", cloudEvent[Sequence.SequenceTypeAttribute]);
|
||||
Assert.Equal("25", cloudEvent[Sequence.SequenceAttribute]);
|
||||
|
@ -35,9 +36,9 @@ namespace CloudNative.CloudEvents.Extensions.UnitTests
|
|||
public void Transcode()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent1 = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(sampleJson));
|
||||
var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent1, out var contentType);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(jsonData, Sequence.AllAttributes);
|
||||
var cloudEvent1 = jsonFormatter.DecodeStructuredModeText(sampleJson);
|
||||
var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent1, out var contentType);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType, Sequence.AllAttributes);
|
||||
|
||||
Assert.Equal("Integer", cloudEvent[Sequence.SequenceTypeAttribute]);
|
||||
Assert.Equal("25", cloudEvent[Sequence.SequenceAttribute]);
|
||||
|
|
|
@ -145,7 +145,7 @@ namespace CloudNative.CloudEvents.Http.UnitTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
async Task HttpStructuredClientReceiveTest()
|
||||
public async Task HttpStructuredClientReceiveTest()
|
||||
{
|
||||
string ctx = Guid.NewGuid().ToString();
|
||||
PendingRequests.TryAdd(ctx, async context =>
|
||||
|
@ -200,7 +200,7 @@ namespace CloudNative.CloudEvents.Http.UnitTests
|
|||
}
|
||||
|
||||
[Fact]
|
||||
async Task HttpStructuredClientSendTest()
|
||||
public async Task HttpStructuredClientSendTest()
|
||||
{
|
||||
var cloudEvent = new CloudEvent
|
||||
{
|
||||
|
@ -231,7 +231,7 @@ namespace CloudNative.CloudEvents.Http.UnitTests
|
|||
Assert.Equal("2018-04-05T17:31:00Z", headers["ce-time"]);
|
||||
// Note that datacontenttype is mapped in this case, but would not be included in binary mode.
|
||||
Assert.Equal("text/xml", headers["ce-datacontenttype"]);
|
||||
Assert.Equal("application/cloudevents+json", context.Request.ContentType);
|
||||
Assert.Equal("application/cloudevents+json; charset=utf-8", context.Request.ContentType);
|
||||
Assert.Equal("value", headers["ce-comexampleextension1"]);
|
||||
// The non-ASCII attribute value should have been URL-encoded using UTF-8 for the header.
|
||||
Assert.Equal("%C3%A6%C3%B8%C3%A5", headers["ce-utf8examplevalue"]);
|
||||
|
|
|
@ -6,6 +6,7 @@ using System;
|
|||
using System.Net.Mime;
|
||||
using System.Text;
|
||||
using Xunit;
|
||||
using static CloudNative.CloudEvents.UnitTests.CloudEventFormatterExtensions;
|
||||
using static CloudNative.CloudEvents.UnitTests.TestHelpers;
|
||||
|
||||
namespace CloudNative.CloudEvents.NewtonsoftJson.UnitTests
|
||||
|
@ -30,11 +31,11 @@ namespace CloudNative.CloudEvents.NewtonsoftJson.UnitTests
|
|||
public void ReserializeTest10()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
|
||||
var jsonData = jsonFormatter.EncodeStructuredEvent(cloudEvent, out var contentType);
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeText(jsonv10);
|
||||
var jsonData = jsonFormatter.EncodeStructuredModeMessage(cloudEvent, out var contentType);
|
||||
Assert.Equal("application/cloudevents+json", contentType.MediaType);
|
||||
|
||||
var cloudEvent2 = jsonFormatter.DecodeStructuredEvent(jsonData);
|
||||
var cloudEvent2 = jsonFormatter.DecodeStructuredModeMessage(jsonData, contentType: null, Array.Empty<CloudEventAttribute>());
|
||||
|
||||
Assert.Equal(cloudEvent2.SpecVersion, cloudEvent.SpecVersion);
|
||||
Assert.Equal(cloudEvent2.Type, cloudEvent.Type);
|
||||
|
@ -49,7 +50,7 @@ namespace CloudNative.CloudEvents.NewtonsoftJson.UnitTests
|
|||
public void StructuredParseSuccess10()
|
||||
{
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10));
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(Encoding.UTF8.GetBytes(jsonv10), contentType: null, extensionAttributes: null);
|
||||
Assert.Equal(CloudEventsSpecVersion.V1_0, cloudEvent.SpecVersion);
|
||||
Assert.Equal("com.github.pull.create", cloudEvent.Type);
|
||||
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), cloudEvent.Source);
|
||||
|
@ -68,7 +69,7 @@ namespace CloudNative.CloudEvents.NewtonsoftJson.UnitTests
|
|||
var extension = CloudEventAttribute.CreateExtension("comexampleextension2", CloudEventAttributeType.Integer);
|
||||
|
||||
var jsonFormatter = new JsonEventFormatter();
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredEvent(Encoding.UTF8.GetBytes(jsonv10), new[] { extension });
|
||||
var cloudEvent = jsonFormatter.DecodeStructuredModeMessage(Encoding.UTF8.GetBytes(jsonv10), contentType: null, new[] { extension });
|
||||
// Instead of getting it as a string (as before), we now have it as an integer.
|
||||
Assert.Equal(10, cloudEvent["comexampleextension2"]);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue