From 555063c0badb2b8b491149a433f867eb6fa7008e Mon Sep 17 00:00:00 2001 From: clemensv Date: Fri, 23 Nov 2018 16:25:48 +0100 Subject: [PATCH] Binary and Structured JSON/HTTP support with initial test coverage Signed-off-by: clemensv --- src/CloudNative.CloudEvents/CloudEvent.cs | 15 +- .../CloudEventAttributes.cs | 15 +- .../CloudEventContent.cs | 162 +++++++++++++++ .../Extensions/DistributedTracingExtension.cs | 11 + .../HttpClientExtension.cs | 184 +++++++---------- .../ICloudEventExtension.cs | 20 ++ .../ICloudEventFormatter.cs | 11 +- .../JsonEventFormatter.cs | 83 +++++++- .../ComExampleExtension1Extension.cs | 10 + .../ComExampleExtension2Extension.cs | 11 + .../HttpTest.cs | 194 ++++++++++++++++++ 11 files changed, 580 insertions(+), 136 deletions(-) create mode 100644 src/CloudNative.CloudEvents/CloudEventContent.cs create mode 100644 test/CloudNative.CloudEvents.UnitTests/HttpTest.cs diff --git a/src/CloudNative.CloudEvents/CloudEvent.cs b/src/CloudNative.CloudEvents/CloudEvent.cs index 1dd93fb..105fc18 100644 --- a/src/CloudNative.CloudEvents/CloudEvent.cs +++ b/src/CloudNative.CloudEvents/CloudEvent.cs @@ -10,6 +10,8 @@ namespace CloudNative.CloudEvents public class CloudEvent { + public const string MediaType = "application/cloudevents"; + readonly IDictionary attributes; /// @@ -33,15 +35,18 @@ namespace CloudNative.CloudEvents /// Create a new CloudEvent instance /// /// Extensions to be added to this CloudEvents - internal CloudEvent(params ICloudEventExtension[] extensions) + internal CloudEvent(IEnumerable extensions) { attributes = new CloudEventAttributes(extensions); SpecVersion = "0.1"; this.Extensions = new Dictionary(); - foreach (var extension in extensions) + if (extensions != null) { - this.Extensions.Add(extension.GetType(), extension); - extension.Attach(this); + foreach (var extension in extensions) + { + this.Extensions.Add(extension.GetType(), extension); + extension.Attach(this); + } } } @@ -72,7 +77,7 @@ namespace CloudNative.CloudEvents /// /// Extensions registered with this event. /// - protected Dictionary Extensions { get; private set; } + protected internal Dictionary Extensions { get; private set; } /// /// CloudEvent 'id' attribute. ID of the event. The semantics of this string are explicitly diff --git a/src/CloudNative.CloudEvents/CloudEventAttributes.cs b/src/CloudNative.CloudEvents/CloudEventAttributes.cs index 3ce23b9..769d373 100644 --- a/src/CloudNative.CloudEvents/CloudEventAttributes.cs +++ b/src/CloudNative.CloudEvents/CloudEventAttributes.cs @@ -29,10 +29,10 @@ namespace CloudNative.CloudEvents public const string TypeAttributeName = "type"; IDictionary dict = new Dictionary(); + + IEnumerable extensions; - ICloudEventExtension[] extensions; - - public CloudEventAttributes(params ICloudEventExtension[] extensions) + public CloudEventAttributes(IEnumerable extensions) { this.extensions = extensions; } @@ -211,11 +211,14 @@ namespace CloudNative.CloudEvents case DataAttributeName: return true; default: - foreach (var extension in extensions) + if (extensions != null) { - if (extension.ValidateAndNormalize(key, ref value)) + foreach (var extension in extensions) { - return true; + if (extension.ValidateAndNormalize(key, ref value)) + { + return true; + } } } diff --git a/src/CloudNative.CloudEvents/CloudEventContent.cs b/src/CloudNative.CloudEvents/CloudEventContent.cs new file mode 100644 index 0000000..d734f38 --- /dev/null +++ b/src/CloudNative.CloudEvents/CloudEventContent.cs @@ -0,0 +1,162 @@ +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +namespace CloudNative.CloudEvents +{ + using System; + using System.IO; + using System.Net; + using System.Net.Http; + using System.Net.Http.Headers; + using System.Text; + using System.Threading.Tasks; + + public class CloudEventContent : HttpContent + { + IInnerContent inner; + static JsonEventFormatter jsonFormatter = new JsonEventFormatter(); + + public CloudEventContent(CloudEvent cloudEvent, ContentMode contentMode, ICloudEventFormatter formatter) + { + if (contentMode == ContentMode.Structured) + { + inner = new InnerByteArrayContent(formatter.EncodeStructuredEvent(cloudEvent, out var contentType, cloudEvent.Extensions.Values)); + Headers.ContentType = new MediaTypeHeaderValue(contentType.MediaType); + MapHeaders(cloudEvent); + return; + } + + if (cloudEvent.Data is byte[]) + { + inner = new InnerByteArrayContent((byte[])cloudEvent.Data); + } + else if (cloudEvent.Data is string) + { + inner = new InnerStringContent((string)cloudEvent.Data); + } + else if (cloudEvent.Data is Stream) + { + inner = new InnerStreamContent((Stream)cloudEvent.Data); + } + else + { + inner = new InnerByteArrayContent(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName, + cloudEvent.Data, cloudEvent.Extensions.Values)); + } + + Headers.ContentType = new MediaTypeHeaderValue(cloudEvent.ContentType?.MediaType); + MapHeaders(cloudEvent); + } + + interface IInnerContent + { + Task InnerSerializeToStreamAsync(Stream stream, TransportContext context); + bool InnerTryComputeLength(out long length); + } + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + return inner.InnerSerializeToStreamAsync(stream, context); + } + + protected override bool TryComputeLength(out long length) + { + return inner.InnerTryComputeLength(out length); + } + + void MapHeaders(CloudEvent cloudEvent) + { + foreach (var attribute in cloudEvent.GetAttributes()) + { + switch (attribute.Key) + { + case CloudEventAttributes.DataAttributeName: + case CloudEventAttributes.ContentTypeAttributeName: + break; + default: + if (attribute.Value is string) + { + Headers.Add("ce-" + attribute.Key, attribute.Value.ToString()); + } + else if (attribute.Value is DateTime) + { + Headers.Add("ce-" + attribute.Key, ((DateTime)attribute.Value).ToString("o")); + } + else if (attribute.Value is Uri || attribute.Value is int) + { + Headers.Add("ce-" + attribute.Key, attribute.Value.ToString()); + } + else + { + Headers.Add("ce-" + attribute.Key, Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value, cloudEvent.Extensions.Values))); + } + break; + } + } + } + + /// + /// This inner class is required to get around the 'protected'-ness of the + /// override functions of HttpContent for enabling containment/delegation + /// + class InnerByteArrayContent : ByteArrayContent, IInnerContent + { + public InnerByteArrayContent(byte[] content) : base(content) + { + } + + public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context) + { + return base.SerializeToStreamAsync(stream, context); + } + + public bool InnerTryComputeLength(out long length) + { + return base.TryComputeLength(out length); + } + } + + /// + /// This inner class is required to get around the 'protected'-ness of the + /// override functions of HttpContent for enabling containment/delegation + /// + class InnerStreamContent : StreamContent, IInnerContent + { + public InnerStreamContent(Stream content) : base(content) + { + } + + public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context) + { + return base.SerializeToStreamAsync(stream, context); + } + + public bool InnerTryComputeLength(out long length) + { + return base.TryComputeLength(out length); + } + } + + /// + /// This inner class is required to get around the 'protected'-ness of the + /// override functions of HttpContent for enabling containment/delegation + /// + class InnerStringContent : StringContent, IInnerContent + { + public InnerStringContent(string content) : base(content) + { + } + + public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context) + { + return base.SerializeToStreamAsync(stream, context); + } + + public bool InnerTryComputeLength(out long length) + { + return base.TryComputeLength(out length); + } + } + } +} \ No newline at end of file diff --git a/src/CloudNative.CloudEvents/Extensions/DistributedTracingExtension.cs b/src/CloudNative.CloudEvents/Extensions/DistributedTracingExtension.cs index 9d303da..fb30435 100644 --- a/src/CloudNative.CloudEvents/Extensions/DistributedTracingExtension.cs +++ b/src/CloudNative.CloudEvents/Extensions/DistributedTracingExtension.cs @@ -71,5 +71,16 @@ namespace CloudNative.CloudEvents.Extensions return false; } + + public Type GetAttributeType(string name) + { + switch (name) + { + case TraceParentAttributeName: + case TraceStateAttributeName: + return typeof(string); + } + return null; + } } } \ No newline at end of file diff --git a/src/CloudNative.CloudEvents/HttpClientExtension.cs b/src/CloudNative.CloudEvents/HttpClientExtension.cs index d20ddd6..25aeec6 100644 --- a/src/CloudNative.CloudEvents/HttpClientExtension.cs +++ b/src/CloudNative.CloudEvents/HttpClientExtension.cs @@ -14,17 +14,19 @@ namespace CloudNative.CloudEvents using System.Text; using System.Threading; using System.Threading.Tasks; + using Newtonsoft.Json; public static class HttpClientExtension { + const string HttpHeaderPrefix = "ce-"; + const string SpecVersionHttpHeader = HttpHeaderPrefix + "specversion"; static JsonEventFormatter jsonFormatter = new JsonEventFormatter(); - public static Task CopyFromAsync(this HttpListenerResponse httpListenerResponse, CloudEvent cloudEvent, ContentMode contentMode, IDictionary extraHeaders, - ICloudEventFormatter formatter) + public static Task CopyFromAsync(this HttpListenerResponse httpListenerResponse, CloudEvent cloudEvent, ContentMode contentMode, ICloudEventFormatter formatter) { if (contentMode == ContentMode.Structured) { - var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType); + var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType, cloudEvent.Extensions.Values); httpListenerResponse.ContentType = contentType.ToString(); MapAttributesToListenerResponse(cloudEvent, httpListenerResponse); return httpListenerResponse.OutputStream.WriteAsync(buffer, 0, buffer.Length); @@ -41,13 +43,42 @@ namespace CloudNative.CloudEvents } else { - stream = new MemoryStream(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName, cloudEvent.Data)); + stream = new MemoryStream(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName, cloudEvent.Data, cloudEvent.Extensions.Values)); } httpListenerResponse.ContentType = cloudEvent.ContentType.ToString(); MapAttributesToListenerResponse(cloudEvent, httpListenerResponse); return stream.CopyToAsync(httpListenerResponse.OutputStream); } + public static async Task CopyFromAsync(this HttpWebRequest httpWebRequest, CloudEvent cloudEvent, ContentMode contentMode, ICloudEventFormatter formatter) + { + if (contentMode == ContentMode.Structured) + { + var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType, cloudEvent.Extensions.Values); + httpWebRequest.ContentType = contentType.ToString(); + MapAttributesToWebRequest(cloudEvent, httpWebRequest); + await (httpWebRequest.GetRequestStream()).WriteAsync(buffer, 0, buffer.Length); + return; + } + + Stream stream; + if (cloudEvent.Data is byte[]) + { + stream = new MemoryStream((byte[])cloudEvent.Data); + } + else if (cloudEvent.Data is Stream) + { + stream = (Stream)cloudEvent.Data; + } + else + { + stream = new MemoryStream(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName, cloudEvent.Data, cloudEvent.Extensions.Values)); + } + httpWebRequest.ContentType = cloudEvent.ContentType.ToString(); + MapAttributesToWebRequest(cloudEvent, httpWebRequest); + await stream.CopyToAsync(httpWebRequest.GetRequestStream()); + } + static void MapAttributesToListenerResponse(CloudEvent cloudEvent, HttpListenerResponse httpListenerResponse) { foreach (var attribute in cloudEvent.GetAttributes()) @@ -57,8 +88,24 @@ namespace CloudNative.CloudEvents case CloudEventAttributes.ContentTypeAttributeName: break; default: - httpListenerResponse.Headers.Add("ce-" + attribute.Key, - Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value))); + httpListenerResponse.Headers.Add(HttpHeaderPrefix + attribute.Key, + Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value, cloudEvent.Extensions.Values))); + break; + } + } + } + + static void MapAttributesToWebRequest(CloudEvent cloudEvent, HttpWebRequest httpWebRequest) + { + foreach (var attribute in cloudEvent.GetAttributes()) + { + switch (attribute.Key) + { + case CloudEventAttributes.ContentTypeAttributeName: + break; + default: + httpWebRequest.Headers.Add(HttpHeaderPrefix + attribute.Key, + Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value, cloudEvent.Extensions.Values))); break; } } @@ -67,54 +114,8 @@ namespace CloudNative.CloudEvents public static bool HasCloudEvent(this HttpResponseMessage httpResponseMessage) { return ((httpResponseMessage.Content.Headers.ContentType != null && - httpResponseMessage.Content.Headers.ContentType.MediaType.StartsWith("application/cloudevents")) || - httpResponseMessage.Headers.Contains("ce-specversion")); - } - - public static Task PostCloudEventAsync(this HttpClient httpClient, - Uri requestUri, - CloudEvent cloudEvent, - ContentMode contentMode = ContentMode.Structured, - IDictionary extraHeaders = null, - ICloudEventFormatter formatter = null) - { - return PutPostCloudEventAsync(httpClient, httpClient.PostAsync, requestUri, cloudEvent, contentMode, - extraHeaders, formatter, CancellationToken.None); - } - - public static Task PostCloudEventAsync(this HttpClient httpClient, - Uri requestUri, - CloudEvent cloudEvent, - ContentMode contentMode, - IDictionary extraHeaders, - ICloudEventFormatter formatter, - CancellationToken cancellationToken) - { - return PutPostCloudEventAsync(httpClient, httpClient.PostAsync, requestUri, cloudEvent, contentMode, - extraHeaders, formatter, cancellationToken); - } - - public static Task PutCloudEventAsync(this HttpClient httpClient, - Uri requestUri, - CloudEvent cloudEvent, - ContentMode contentMode = ContentMode.Structured, - IDictionary extraHeaders = null, - ICloudEventFormatter formatter = null) - { - return PutPostCloudEventAsync(httpClient, httpClient.PutAsync, requestUri, cloudEvent, contentMode, - extraHeaders, formatter, CancellationToken.None); - } - - public static Task PutCloudEventAsync(this HttpClient httpClient, - Uri requestUri, - CloudEvent cloudEvent, - ContentMode contentMode, - IDictionary extraHeaders, - ICloudEventFormatter formatter, - CancellationToken cancellationToken) - { - return PutPostCloudEventAsync(httpClient, httpClient.PutAsync, requestUri, cloudEvent, contentMode, - extraHeaders, formatter, cancellationToken); + httpResponseMessage.Content.Headers.ContentType.MediaType.StartsWith(CloudEvent.MediaType)) || + httpResponseMessage.Headers.Contains(SpecVersionHttpHeader)); } public static Task ToCloudEvent(this HttpResponseMessage httpResponseMessage, @@ -134,15 +135,14 @@ namespace CloudNative.CloudEvents params ICloudEventExtension[] extensions) { if (httpListenerRequest.ContentType != null && - httpListenerRequest.ContentType.StartsWith("application/cloudevents", + httpListenerRequest.ContentType.StartsWith(CloudEvent.MediaType, StringComparison.InvariantCultureIgnoreCase)) { // handle structured mode if (formatter == null) { // if we didn't get a formatter, pick one - if (httpListenerRequest.ContentType.EndsWith("+json", - StringComparison.InvariantCultureIgnoreCase)) + if (httpListenerRequest.ContentType.EndsWith(JsonEventFormatter.MediaTypeSuffix, StringComparison.InvariantCultureIgnoreCase)) { formatter = jsonFormatter; } @@ -160,11 +160,18 @@ namespace CloudNative.CloudEvents var attributes = cloudEvent.GetAttributes(); foreach (var httpResponseHeader in httpListenerRequest.Headers.AllKeys) { - if (httpResponseHeader.StartsWith("ce-", StringComparison.InvariantCultureIgnoreCase)) + if (httpResponseHeader.StartsWith(HttpHeaderPrefix, StringComparison.InvariantCultureIgnoreCase)) { - attributes.Add( - httpResponseHeader.Substring(3).ToLowerInvariant(), - httpListenerRequest.Headers[httpResponseHeader]); + string headerValue = httpListenerRequest.Headers[httpResponseHeader]; + if (headerValue.StartsWith("{") && headerValue.EndsWith("}") || headerValue.StartsWith("[") && headerValue.EndsWith("]")) + { + attributes[httpResponseHeader.Substring(3).ToLowerInvariant()] = + JsonConvert.DeserializeObject(headerValue); + } + else + { + attributes[httpResponseHeader.Substring(3).ToLowerInvariant()] = headerValue; + } } } @@ -176,54 +183,6 @@ namespace CloudNative.CloudEvents } } - static void MapHeadersToHttpContent(CloudEvent cloudEvent, HttpContent content) - { - foreach (var attribute in cloudEvent.GetAttributes()) - { - switch (attribute.Key) - { - case CloudEventAttributes.ContentTypeAttributeName: - break; - default: - content.Headers.Add("ce-" + attribute.Key, - Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value))); - break; - } - } - } - - static Task PutPostCloudEventAsync(HttpClient httpClient, - Func> putpostFunc, - Uri requestUri, - CloudEvent cloudEvent, - ContentMode contentMode, - IDictionary extraHeaders, - ICloudEventFormatter formatter, - CancellationToken cancellationToken) - { - HttpContent content = null; - if (contentMode == ContentMode.Structured) - { - content = new ByteArrayContent(formatter.EncodeStructuredEvent(cloudEvent, out var contentType)); - content.Headers.ContentType = new MediaTypeHeaderValue(contentType.ToString()); - MapHeadersToHttpContent(cloudEvent, content); - return putpostFunc(requestUri, content, cancellationToken); - } - - if (cloudEvent.Data is byte[]) - { - content = new ByteArrayContent((byte[])cloudEvent.Data); - } - else - { - content = new ByteArrayContent(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName, - cloudEvent.Data)); - } - - content.Headers.ContentType = new MediaTypeHeaderValue(cloudEvent.ContentType?.MediaType); - MapHeadersToHttpContent(cloudEvent, content); - return putpostFunc(requestUri, content, cancellationToken); - } static async Task ToCloudEventInternalAsync(HttpResponseMessage httpResponseMessage, ICloudEventFormatter formatter, ICloudEventExtension[] extensions) @@ -256,11 +215,10 @@ namespace CloudNative.CloudEvents var attributes = cloudEvent.GetAttributes(); foreach (var httpResponseHeader in httpResponseMessage.Headers) { - if (httpResponseHeader.Key.StartsWith("ce-", StringComparison.InvariantCultureIgnoreCase)) + if (httpResponseHeader.Key.StartsWith(HttpHeaderPrefix, StringComparison.InvariantCultureIgnoreCase)) { - attributes.Add( - httpResponseHeader.Key.Substring(3).ToLowerInvariant(), - httpResponseHeader.Value); + attributes[httpResponseHeader.Key.Substring(3).ToLowerInvariant()] = + httpResponseHeader.Value; } } diff --git a/src/CloudNative.CloudEvents/ICloudEventExtension.cs b/src/CloudNative.CloudEvents/ICloudEventExtension.cs index c552147..e42cd1e 100644 --- a/src/CloudNative.CloudEvents/ICloudEventExtension.cs +++ b/src/CloudNative.CloudEvents/ICloudEventExtension.cs @@ -4,9 +4,29 @@ namespace CloudNative.CloudEvents { + using System; + public interface ICloudEventExtension { + /// + /// Attaches this extension instance to the given CloudEvent + /// + /// void Attach(CloudEvent cloudEvent); + /// + /// Validates the given attribute value and normalizes it if needed. + /// Normalization may include changing the data type. + /// + /// Attribute name + /// Attribute value + /// true if the attribute is handled by this extension bool ValidateAndNormalize(string key, ref dynamic value); + /// + /// Returns the CLR data type for the given attribute or NULL when + /// the attribute is not handled by this extension, + /// + /// Attribute name + /// CLR type + Type GetAttributeType(string name); } } \ No newline at end of file diff --git a/src/CloudNative.CloudEvents/ICloudEventFormatter.cs b/src/CloudNative.CloudEvents/ICloudEventFormatter.cs index 27bdceb..776837e 100644 --- a/src/CloudNative.CloudEvents/ICloudEventFormatter.cs +++ b/src/CloudNative.CloudEvents/ICloudEventFormatter.cs @@ -4,16 +4,17 @@ namespace CloudNative.CloudEvents { + using System.Collections.Generic; using System.IO; using System.Net.Mime; public interface ICloudEventFormatter { - CloudEvent DecodeStructuredEvent(Stream data, params ICloudEventExtension[] extensions); - CloudEvent DecodeStructuredEvent(byte[] data, params ICloudEventExtension[] extensions); - byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType); - object DecodeAttribute(string name, byte[] data); - byte[] EncodeAttribute(string name, object value); + CloudEvent DecodeStructuredEvent(Stream data, IEnumerable extensions); + CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable extensions); + byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType, IEnumerable extensions); + object DecodeAttribute(string name, byte[] data, IEnumerable extensions); + byte[] EncodeAttribute(string name, object value, IEnumerable extensions); } } \ No newline at end of file diff --git a/src/CloudNative.CloudEvents/JsonEventFormatter.cs b/src/CloudNative.CloudEvents/JsonEventFormatter.cs index 23d7d1d..a2267fb 100644 --- a/src/CloudNative.CloudEvents/JsonEventFormatter.cs +++ b/src/CloudNative.CloudEvents/JsonEventFormatter.cs @@ -5,6 +5,7 @@ namespace CloudNative.CloudEvents { using System; + using System.Collections.Generic; using System.IO; using System.Net.Mime; using System.Text; @@ -13,7 +14,15 @@ namespace CloudNative.CloudEvents public class JsonEventFormatter : ICloudEventFormatter { + + public const string MediaTypeSuffix = "+json"; + public CloudEvent DecodeStructuredEvent(Stream data, params ICloudEventExtension[] extensions) + { + return DecodeStructuredEvent(data, (IEnumerable)extensions); + } + + public CloudEvent DecodeStructuredEvent(Stream data, IEnumerable extensions = null) { var jsonReader = new JsonTextReader(new StreamReader(data, Encoding.UTF8, true, 8192, true)); var jObject = JObject.Load(jsonReader); @@ -21,13 +30,18 @@ namespace CloudNative.CloudEvents } public CloudEvent DecodeStructuredEvent(byte[] data, params ICloudEventExtension[] extensions) + { + return DecodeStructuredEvent(data, (IEnumerable)extensions); + } + + public CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable extensions = null) { var jsonText = Encoding.UTF8.GetString(data); var jObject = JObject.Parse(jsonText); return DecodeJObject(jObject, extensions); } - public CloudEvent DecodeJObject(JObject jObject, params ICloudEventExtension[] extensions) + public CloudEvent DecodeJObject(JObject jObject, IEnumerable extensions = null) { var cloudEvent = new CloudEvent(extensions); var attributes = cloudEvent.GetAttributes(); @@ -56,14 +70,15 @@ namespace CloudNative.CloudEvents break; } } + return cloudEvent; } - public byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType) + public byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType, IEnumerable extensions = null) { contentType = new ContentType("application/cloudevents+json") { - CharSet = Encoding.UTF8.EncodingName + CharSet = Encoding.UTF8.WebName }; JObject jObject = new JObject(); @@ -79,17 +94,71 @@ namespace CloudNative.CloudEvents jObject[keyValuePair.Key] = JToken.FromObject(keyValuePair.Value); } } + return Encoding.UTF8.GetBytes(jObject.ToString()); } - public object DecodeAttribute(string name, byte[] data) + public object DecodeAttribute(string name, byte[] data, IEnumerable extensions = null) { - throw new NotImplementedException(); + switch (name) + { + case CloudEventAttributes.SpecVersionAttributeName: + case CloudEventAttributes.IdAttributeName: + case CloudEventAttributes.TypeAttributeName: + return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string)); + case CloudEventAttributes.TimeAttributeName: + return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(DateTime)); + case CloudEventAttributes.SourceAttributeName: + case CloudEventAttributes.SchemaUrlAttributeName: + var uri = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string)) as string; + return new Uri(uri); + case CloudEventAttributes.ContentTypeAttributeName: + var s = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string)) as string; + return new ContentType(s); + } + + if (extensions != null) + { + foreach (var extension in extensions) + { + Type type = extension.GetAttributeType(name); + if (type != null) + { + return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), type); + } + } + } + + return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); } - public byte[] EncodeAttribute(string name, object value) + public byte[] EncodeAttribute(string name, object value, IEnumerable extensions = null) { - throw new NotImplementedException(); + if (name.Equals(CloudEventAttributes.DataAttributeName)) + { + if (value is Stream) + { + using (var buffer = new MemoryStream()) + { + ((Stream)value).CopyTo(buffer); + return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(buffer.ToArray())); + } + } + } + + if (extensions != null) + { + foreach (var extension in extensions) + { + Type type = extension.GetAttributeType(name); + if (type != null) + { + return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(Convert.ChangeType(value, type))); + } + } + } + + return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(value)); } } } \ No newline at end of file diff --git a/test/CloudNative.CloudEvents.UnitTests/ComExampleExtension1Extension.cs b/test/CloudNative.CloudEvents.UnitTests/ComExampleExtension1Extension.cs index 1ae8a69..421a354 100644 --- a/test/CloudNative.CloudEvents.UnitTests/ComExampleExtension1Extension.cs +++ b/test/CloudNative.CloudEvents.UnitTests/ComExampleExtension1Extension.cs @@ -55,5 +55,15 @@ namespace CloudNative.CloudEvents.UnitTests return false; } + + public Type GetAttributeType(string name) + { + switch (name) + { + case extensionAttribute: + return typeof(string); + } + return null; + } } } \ No newline at end of file diff --git a/test/CloudNative.CloudEvents.UnitTests/ComExampleExtension2Extension.cs b/test/CloudNative.CloudEvents.UnitTests/ComExampleExtension2Extension.cs index ed92b50..b0f52f7 100644 --- a/test/CloudNative.CloudEvents.UnitTests/ComExampleExtension2Extension.cs +++ b/test/CloudNative.CloudEvents.UnitTests/ComExampleExtension2Extension.cs @@ -4,6 +4,7 @@ namespace CloudNative.CloudEvents.UnitTests { + using System; using System.Collections.Generic; public class ComExampleExtension2Extension : ICloudEventExtension @@ -59,5 +60,15 @@ namespace CloudNative.CloudEvents.UnitTests return false; } + + public Type GetAttributeType(string name) + { + switch (name) + { + case extensionAttribute: + return typeof(ComExampleExtension2Data); + } + return null; + } } } \ No newline at end of file diff --git a/test/CloudNative.CloudEvents.UnitTests/HttpTest.cs b/test/CloudNative.CloudEvents.UnitTests/HttpTest.cs new file mode 100644 index 0000000..fb62910 --- /dev/null +++ b/test/CloudNative.CloudEvents.UnitTests/HttpTest.cs @@ -0,0 +1,194 @@ +// Copyright (c) Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +namespace CloudNative.CloudEvents.UnitTests +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.IO; + using System.Net; + using System.Net.Http; + using System.Net.Mime; + using System.Security.Authentication.ExtendedProtection; + using System.Threading.Tasks; + using Xunit; + + public class HttpTest : IDisposable + { + private const string listenerAddress = "http://localhost:52671/"; + private const string testContextHeader = "testcontext"; + HttpListener listener; + ConcurrentDictionary> pendingRequests = new ConcurrentDictionary>(); + + public HttpTest() + { + listener = new HttpListener() + { + AuthenticationSchemes = AuthenticationSchemes.Anonymous, + Prefixes = { listenerAddress } + }; + listener.Start(); + listener.GetContextAsync().ContinueWith(t => + { + if (t.IsCompletedSuccessfully) + { + HandleContext(t.Result); + } + }); + + } + + async Task HandleContext(HttpListenerContext requestContext) + { + var ctxHeaderValue = requestContext.Request.Headers[testContextHeader]; + if (pendingRequests.TryRemove(ctxHeaderValue, out var pending)) + { + await pending(requestContext); + } +#pragma warning disable 4014 + listener.GetContextAsync().ContinueWith(t => + { + if (t.IsCompletedSuccessfully) + { + HandleContext(t.Result); + } + }); +#pragma warning restore 4014 + } + + public void Dispose() + { + listener.Stop(); + } + + [Fact] + async Task HttpStructuredClientTest() + { + var cloudEvent = new CloudEvent("com.github.pull.create", + new Uri("https://github.com/cloudevents/spec/pull/123")) + { + Id = "A234-1234-1234", + Time = new DateTime(2018, 4, 5, 17, 31, 0, DateTimeKind.Utc), + ContentType = new ContentType(MediaTypeNames.Text.Xml), + Data = "" + }; + + var attrs = cloudEvent.GetAttributes(); + attrs["comexampleextension1"] = "value"; + attrs["comexampleextension2"] = new { othervalue = 5 }; + + + string ctx = Guid.NewGuid().ToString(); + var content = new CloudEventContent(cloudEvent, ContentMode.Structured, new JsonEventFormatter()); + content.Headers.Add(testContextHeader, ctx); + + + pendingRequests.TryAdd(ctx, async context => + { + try + { + var receivedCloudEvent = await context.Request.ToCloudEventAsync(new JsonEventFormatter()); + + Assert.Equal("0.1", receivedCloudEvent.SpecVersion); + Assert.Equal("com.github.pull.create", receivedCloudEvent.Type); + Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source); + Assert.Equal("A234-1234-1234", receivedCloudEvent.Id); + Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(), + receivedCloudEvent.Time.Value.ToUniversalTime()); + Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), receivedCloudEvent.ContentType); + Assert.Equal("", receivedCloudEvent.Data); + + var attr = receivedCloudEvent.GetAttributes(); + Assert.Equal("value", (string)attr["comexampleextension1"]); + Assert.Equal(5, (int)((dynamic)attr["comexampleextension2"]).othervalue); + context.Response.StatusCode = (int)HttpStatusCode.NoContent; + } + catch (Exception e) + { + using (var sw = new StreamWriter(context.Response.OutputStream)) + { + sw.Write(e.ToString()); + context.Response.StatusCode = (int)HttpStatusCode.InternalServerError; + } + } + context.Response.Close(); + }); + + var httpClient = new HttpClient(); + var result = (await httpClient.PostAsync(new Uri(listenerAddress + "ep"), content)); + if (result.StatusCode != HttpStatusCode.NoContent) + { + throw new InvalidOperationException(result.Content.ReadAsStringAsync().GetAwaiter().GetResult()); + } + } + + [Fact] + async Task HttpBinaryClientTest() + { + var cloudEvent = new CloudEvent("com.github.pull.create", + new Uri("https://github.com/cloudevents/spec/pull/123")) + { + Id = "A234-1234-1234", + Time = new DateTime(2018, 4, 5, 17, 31, 0, DateTimeKind.Utc), + ContentType = new ContentType(MediaTypeNames.Text.Xml), + Data = "" + }; + + var attrs = cloudEvent.GetAttributes(); + attrs["comexampleextension1"] = "value"; + attrs["comexampleextension2"] = new { othervalue = 5 }; + + + string ctx = Guid.NewGuid().ToString(); + var content = new CloudEventContent(cloudEvent, ContentMode.Binary, new JsonEventFormatter()); + content.Headers.Add(testContextHeader, ctx); + + + pendingRequests.TryAdd(ctx, async context => + { + try + { + var receivedCloudEvent = await context.Request.ToCloudEventAsync(new JsonEventFormatter()); + + Assert.Equal("0.1", receivedCloudEvent.SpecVersion); + Assert.Equal("com.github.pull.create", receivedCloudEvent.Type); + Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source); + Assert.Equal("A234-1234-1234", receivedCloudEvent.Id); + Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(), + receivedCloudEvent.Time.Value.ToUniversalTime()); + Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), receivedCloudEvent.ContentType); + + using (var sr = new StreamReader((Stream)receivedCloudEvent.Data)) + { + Assert.Equal("", sr.ReadToEnd()); + } + + var attr = receivedCloudEvent.GetAttributes(); + Assert.Equal("value", (string)attr["comexampleextension1"]); + Assert.Equal(5, (int)((dynamic)attr["comexampleextension2"]).othervalue); + context.Response.StatusCode = (int)HttpStatusCode.NoContent; + } + catch (Exception e) + { + using (var sw = new StreamWriter(context.Response.OutputStream)) + { + sw.Write(e.ToString()); + context.Response.StatusCode = (int)HttpStatusCode.InternalServerError; + } + } + context.Response.Close(); + }); + + var httpClient = new HttpClient(); + var result = (await httpClient.PostAsync(new Uri(listenerAddress + "ep"), content)); + if (result.StatusCode != HttpStatusCode.NoContent) + { + throw new InvalidOperationException(result.Content.ReadAsStringAsync().GetAwaiter().GetResult()); + } + } + + + } +} \ No newline at end of file