Binary and Structured JSON/HTTP support with initial test coverage

Signed-off-by: clemensv <clemensv@microsoft.com>
This commit is contained in:
clemensv 2018-11-23 16:25:48 +01:00
parent d27d743053
commit 555063c0ba
11 changed files with 580 additions and 136 deletions

View File

@ -10,6 +10,8 @@ namespace CloudNative.CloudEvents
public class CloudEvent
{
public const string MediaType = "application/cloudevents";
readonly IDictionary<string, object> attributes;
/// <summary>
@ -33,15 +35,18 @@ namespace CloudNative.CloudEvents
/// Create a new CloudEvent instance
/// </summary>
/// <param name="extensions">Extensions to be added to this CloudEvents</param>
internal CloudEvent(params ICloudEventExtension[] extensions)
internal CloudEvent(IEnumerable<ICloudEventExtension> extensions)
{
attributes = new CloudEventAttributes(extensions);
SpecVersion = "0.1";
this.Extensions = new Dictionary<Type, ICloudEventExtension>();
foreach (var extension in extensions)
if (extensions != null)
{
this.Extensions.Add(extension.GetType(), extension);
extension.Attach(this);
foreach (var extension in extensions)
{
this.Extensions.Add(extension.GetType(), extension);
extension.Attach(this);
}
}
}
@ -72,7 +77,7 @@ namespace CloudNative.CloudEvents
/// <summary>
/// Extensions registered with this event.
/// </summary>
protected Dictionary<Type, ICloudEventExtension> Extensions { get; private set; }
protected internal Dictionary<Type, ICloudEventExtension> Extensions { get; private set; }
/// <summary>
/// CloudEvent 'id' attribute. ID of the event. The semantics of this string are explicitly

View File

@ -29,10 +29,10 @@ namespace CloudNative.CloudEvents
public const string TypeAttributeName = "type";
IDictionary<string, object> dict = new Dictionary<string, object>();
IEnumerable<ICloudEventExtension> extensions;
ICloudEventExtension[] extensions;
public CloudEventAttributes(params ICloudEventExtension[] extensions)
public CloudEventAttributes(IEnumerable<ICloudEventExtension> extensions)
{
this.extensions = extensions;
}
@ -211,11 +211,14 @@ namespace CloudNative.CloudEvents
case DataAttributeName:
return true;
default:
foreach (var extension in extensions)
if (extensions != null)
{
if (extension.ValidateAndNormalize(key, ref value))
foreach (var extension in extensions)
{
return true;
if (extension.ValidateAndNormalize(key, ref value))
{
return true;
}
}
}

View File

@ -0,0 +1,162 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
namespace CloudNative.CloudEvents
{
using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
public class CloudEventContent : HttpContent
{
IInnerContent inner;
static JsonEventFormatter jsonFormatter = new JsonEventFormatter();
public CloudEventContent(CloudEvent cloudEvent, ContentMode contentMode, ICloudEventFormatter formatter)
{
if (contentMode == ContentMode.Structured)
{
inner = new InnerByteArrayContent(formatter.EncodeStructuredEvent(cloudEvent, out var contentType, cloudEvent.Extensions.Values));
Headers.ContentType = new MediaTypeHeaderValue(contentType.MediaType);
MapHeaders(cloudEvent);
return;
}
if (cloudEvent.Data is byte[])
{
inner = new InnerByteArrayContent((byte[])cloudEvent.Data);
}
else if (cloudEvent.Data is string)
{
inner = new InnerStringContent((string)cloudEvent.Data);
}
else if (cloudEvent.Data is Stream)
{
inner = new InnerStreamContent((Stream)cloudEvent.Data);
}
else
{
inner = new InnerByteArrayContent(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName,
cloudEvent.Data, cloudEvent.Extensions.Values));
}
Headers.ContentType = new MediaTypeHeaderValue(cloudEvent.ContentType?.MediaType);
MapHeaders(cloudEvent);
}
interface IInnerContent
{
Task InnerSerializeToStreamAsync(Stream stream, TransportContext context);
bool InnerTryComputeLength(out long length);
}
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
return inner.InnerSerializeToStreamAsync(stream, context);
}
protected override bool TryComputeLength(out long length)
{
return inner.InnerTryComputeLength(out length);
}
void MapHeaders(CloudEvent cloudEvent)
{
foreach (var attribute in cloudEvent.GetAttributes())
{
switch (attribute.Key)
{
case CloudEventAttributes.DataAttributeName:
case CloudEventAttributes.ContentTypeAttributeName:
break;
default:
if (attribute.Value is string)
{
Headers.Add("ce-" + attribute.Key, attribute.Value.ToString());
}
else if (attribute.Value is DateTime)
{
Headers.Add("ce-" + attribute.Key, ((DateTime)attribute.Value).ToString("o"));
}
else if (attribute.Value is Uri || attribute.Value is int)
{
Headers.Add("ce-" + attribute.Key, attribute.Value.ToString());
}
else
{
Headers.Add("ce-" + attribute.Key, Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value, cloudEvent.Extensions.Values)));
}
break;
}
}
}
/// <summary>
/// This inner class is required to get around the 'protected'-ness of the
/// override functions of HttpContent for enabling containment/delegation
/// </summary>
class InnerByteArrayContent : ByteArrayContent, IInnerContent
{
public InnerByteArrayContent(byte[] content) : base(content)
{
}
public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context)
{
return base.SerializeToStreamAsync(stream, context);
}
public bool InnerTryComputeLength(out long length)
{
return base.TryComputeLength(out length);
}
}
/// <summary>
/// This inner class is required to get around the 'protected'-ness of the
/// override functions of HttpContent for enabling containment/delegation
/// </summary>
class InnerStreamContent : StreamContent, IInnerContent
{
public InnerStreamContent(Stream content) : base(content)
{
}
public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context)
{
return base.SerializeToStreamAsync(stream, context);
}
public bool InnerTryComputeLength(out long length)
{
return base.TryComputeLength(out length);
}
}
/// <summary>
/// This inner class is required to get around the 'protected'-ness of the
/// override functions of HttpContent for enabling containment/delegation
/// </summary>
class InnerStringContent : StringContent, IInnerContent
{
public InnerStringContent(string content) : base(content)
{
}
public Task InnerSerializeToStreamAsync(Stream stream, TransportContext context)
{
return base.SerializeToStreamAsync(stream, context);
}
public bool InnerTryComputeLength(out long length)
{
return base.TryComputeLength(out length);
}
}
}
}

View File

@ -71,5 +71,16 @@ namespace CloudNative.CloudEvents.Extensions
return false;
}
public Type GetAttributeType(string name)
{
switch (name)
{
case TraceParentAttributeName:
case TraceStateAttributeName:
return typeof(string);
}
return null;
}
}
}

View File

@ -14,17 +14,19 @@ namespace CloudNative.CloudEvents
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
public static class HttpClientExtension
{
const string HttpHeaderPrefix = "ce-";
const string SpecVersionHttpHeader = HttpHeaderPrefix + "specversion";
static JsonEventFormatter jsonFormatter = new JsonEventFormatter();
public static Task CopyFromAsync(this HttpListenerResponse httpListenerResponse, CloudEvent cloudEvent, ContentMode contentMode, IDictionary<string, string> extraHeaders,
ICloudEventFormatter formatter)
public static Task CopyFromAsync(this HttpListenerResponse httpListenerResponse, CloudEvent cloudEvent, ContentMode contentMode, ICloudEventFormatter formatter)
{
if (contentMode == ContentMode.Structured)
{
var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType);
var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType, cloudEvent.Extensions.Values);
httpListenerResponse.ContentType = contentType.ToString();
MapAttributesToListenerResponse(cloudEvent, httpListenerResponse);
return httpListenerResponse.OutputStream.WriteAsync(buffer, 0, buffer.Length);
@ -41,13 +43,42 @@ namespace CloudNative.CloudEvents
}
else
{
stream = new MemoryStream(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName, cloudEvent.Data));
stream = new MemoryStream(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName, cloudEvent.Data, cloudEvent.Extensions.Values));
}
httpListenerResponse.ContentType = cloudEvent.ContentType.ToString();
MapAttributesToListenerResponse(cloudEvent, httpListenerResponse);
return stream.CopyToAsync(httpListenerResponse.OutputStream);
}
public static async Task CopyFromAsync(this HttpWebRequest httpWebRequest, CloudEvent cloudEvent, ContentMode contentMode, ICloudEventFormatter formatter)
{
if (contentMode == ContentMode.Structured)
{
var buffer = formatter.EncodeStructuredEvent(cloudEvent, out var contentType, cloudEvent.Extensions.Values);
httpWebRequest.ContentType = contentType.ToString();
MapAttributesToWebRequest(cloudEvent, httpWebRequest);
await (httpWebRequest.GetRequestStream()).WriteAsync(buffer, 0, buffer.Length);
return;
}
Stream stream;
if (cloudEvent.Data is byte[])
{
stream = new MemoryStream((byte[])cloudEvent.Data);
}
else if (cloudEvent.Data is Stream)
{
stream = (Stream)cloudEvent.Data;
}
else
{
stream = new MemoryStream(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName, cloudEvent.Data, cloudEvent.Extensions.Values));
}
httpWebRequest.ContentType = cloudEvent.ContentType.ToString();
MapAttributesToWebRequest(cloudEvent, httpWebRequest);
await stream.CopyToAsync(httpWebRequest.GetRequestStream());
}
static void MapAttributesToListenerResponse(CloudEvent cloudEvent, HttpListenerResponse httpListenerResponse)
{
foreach (var attribute in cloudEvent.GetAttributes())
@ -57,8 +88,24 @@ namespace CloudNative.CloudEvents
case CloudEventAttributes.ContentTypeAttributeName:
break;
default:
httpListenerResponse.Headers.Add("ce-" + attribute.Key,
Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value)));
httpListenerResponse.Headers.Add(HttpHeaderPrefix + attribute.Key,
Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value, cloudEvent.Extensions.Values)));
break;
}
}
}
static void MapAttributesToWebRequest(CloudEvent cloudEvent, HttpWebRequest httpWebRequest)
{
foreach (var attribute in cloudEvent.GetAttributes())
{
switch (attribute.Key)
{
case CloudEventAttributes.ContentTypeAttributeName:
break;
default:
httpWebRequest.Headers.Add(HttpHeaderPrefix + attribute.Key,
Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value, cloudEvent.Extensions.Values)));
break;
}
}
@ -67,54 +114,8 @@ namespace CloudNative.CloudEvents
public static bool HasCloudEvent(this HttpResponseMessage httpResponseMessage)
{
return ((httpResponseMessage.Content.Headers.ContentType != null &&
httpResponseMessage.Content.Headers.ContentType.MediaType.StartsWith("application/cloudevents")) ||
httpResponseMessage.Headers.Contains("ce-specversion"));
}
public static Task<HttpResponseMessage> PostCloudEventAsync(this HttpClient httpClient,
Uri requestUri,
CloudEvent cloudEvent,
ContentMode contentMode = ContentMode.Structured,
IDictionary<string, string> extraHeaders = null,
ICloudEventFormatter formatter = null)
{
return PutPostCloudEventAsync(httpClient, httpClient.PostAsync, requestUri, cloudEvent, contentMode,
extraHeaders, formatter, CancellationToken.None);
}
public static Task<HttpResponseMessage> PostCloudEventAsync(this HttpClient httpClient,
Uri requestUri,
CloudEvent cloudEvent,
ContentMode contentMode,
IDictionary<string, string> extraHeaders,
ICloudEventFormatter formatter,
CancellationToken cancellationToken)
{
return PutPostCloudEventAsync(httpClient, httpClient.PostAsync, requestUri, cloudEvent, contentMode,
extraHeaders, formatter, cancellationToken);
}
public static Task<HttpResponseMessage> PutCloudEventAsync(this HttpClient httpClient,
Uri requestUri,
CloudEvent cloudEvent,
ContentMode contentMode = ContentMode.Structured,
IDictionary<string, string> extraHeaders = null,
ICloudEventFormatter formatter = null)
{
return PutPostCloudEventAsync(httpClient, httpClient.PutAsync, requestUri, cloudEvent, contentMode,
extraHeaders, formatter, CancellationToken.None);
}
public static Task<HttpResponseMessage> PutCloudEventAsync(this HttpClient httpClient,
Uri requestUri,
CloudEvent cloudEvent,
ContentMode contentMode,
IDictionary<string, string> extraHeaders,
ICloudEventFormatter formatter,
CancellationToken cancellationToken)
{
return PutPostCloudEventAsync(httpClient, httpClient.PutAsync, requestUri, cloudEvent, contentMode,
extraHeaders, formatter, cancellationToken);
httpResponseMessage.Content.Headers.ContentType.MediaType.StartsWith(CloudEvent.MediaType)) ||
httpResponseMessage.Headers.Contains(SpecVersionHttpHeader));
}
public static Task<CloudEvent> ToCloudEvent(this HttpResponseMessage httpResponseMessage,
@ -134,15 +135,14 @@ namespace CloudNative.CloudEvents
params ICloudEventExtension[] extensions)
{
if (httpListenerRequest.ContentType != null &&
httpListenerRequest.ContentType.StartsWith("application/cloudevents",
httpListenerRequest.ContentType.StartsWith(CloudEvent.MediaType,
StringComparison.InvariantCultureIgnoreCase))
{
// handle structured mode
if (formatter == null)
{
// if we didn't get a formatter, pick one
if (httpListenerRequest.ContentType.EndsWith("+json",
StringComparison.InvariantCultureIgnoreCase))
if (httpListenerRequest.ContentType.EndsWith(JsonEventFormatter.MediaTypeSuffix, StringComparison.InvariantCultureIgnoreCase))
{
formatter = jsonFormatter;
}
@ -160,11 +160,18 @@ namespace CloudNative.CloudEvents
var attributes = cloudEvent.GetAttributes();
foreach (var httpResponseHeader in httpListenerRequest.Headers.AllKeys)
{
if (httpResponseHeader.StartsWith("ce-", StringComparison.InvariantCultureIgnoreCase))
if (httpResponseHeader.StartsWith(HttpHeaderPrefix, StringComparison.InvariantCultureIgnoreCase))
{
attributes.Add(
httpResponseHeader.Substring(3).ToLowerInvariant(),
httpListenerRequest.Headers[httpResponseHeader]);
string headerValue = httpListenerRequest.Headers[httpResponseHeader];
if (headerValue.StartsWith("{") && headerValue.EndsWith("}") || headerValue.StartsWith("[") && headerValue.EndsWith("]"))
{
attributes[httpResponseHeader.Substring(3).ToLowerInvariant()] =
JsonConvert.DeserializeObject(headerValue);
}
else
{
attributes[httpResponseHeader.Substring(3).ToLowerInvariant()] = headerValue;
}
}
}
@ -176,54 +183,6 @@ namespace CloudNative.CloudEvents
}
}
static void MapHeadersToHttpContent(CloudEvent cloudEvent, HttpContent content)
{
foreach (var attribute in cloudEvent.GetAttributes())
{
switch (attribute.Key)
{
case CloudEventAttributes.ContentTypeAttributeName:
break;
default:
content.Headers.Add("ce-" + attribute.Key,
Encoding.UTF8.GetString(jsonFormatter.EncodeAttribute(attribute.Key, attribute.Value)));
break;
}
}
}
static Task<HttpResponseMessage> PutPostCloudEventAsync(HttpClient httpClient,
Func<Uri, HttpContent, CancellationToken, Task<HttpResponseMessage>> putpostFunc,
Uri requestUri,
CloudEvent cloudEvent,
ContentMode contentMode,
IDictionary<string, string> extraHeaders,
ICloudEventFormatter formatter,
CancellationToken cancellationToken)
{
HttpContent content = null;
if (contentMode == ContentMode.Structured)
{
content = new ByteArrayContent(formatter.EncodeStructuredEvent(cloudEvent, out var contentType));
content.Headers.ContentType = new MediaTypeHeaderValue(contentType.ToString());
MapHeadersToHttpContent(cloudEvent, content);
return putpostFunc(requestUri, content, cancellationToken);
}
if (cloudEvent.Data is byte[])
{
content = new ByteArrayContent((byte[])cloudEvent.Data);
}
else
{
content = new ByteArrayContent(formatter.EncodeAttribute(CloudEventAttributes.DataAttributeName,
cloudEvent.Data));
}
content.Headers.ContentType = new MediaTypeHeaderValue(cloudEvent.ContentType?.MediaType);
MapHeadersToHttpContent(cloudEvent, content);
return putpostFunc(requestUri, content, cancellationToken);
}
static async Task<CloudEvent> ToCloudEventInternalAsync(HttpResponseMessage httpResponseMessage,
ICloudEventFormatter formatter, ICloudEventExtension[] extensions)
@ -256,11 +215,10 @@ namespace CloudNative.CloudEvents
var attributes = cloudEvent.GetAttributes();
foreach (var httpResponseHeader in httpResponseMessage.Headers)
{
if (httpResponseHeader.Key.StartsWith("ce-", StringComparison.InvariantCultureIgnoreCase))
if (httpResponseHeader.Key.StartsWith(HttpHeaderPrefix, StringComparison.InvariantCultureIgnoreCase))
{
attributes.Add(
httpResponseHeader.Key.Substring(3).ToLowerInvariant(),
httpResponseHeader.Value);
attributes[httpResponseHeader.Key.Substring(3).ToLowerInvariant()] =
httpResponseHeader.Value;
}
}

View File

@ -4,9 +4,29 @@
namespace CloudNative.CloudEvents
{
using System;
public interface ICloudEventExtension
{
/// <summary>
/// Attaches this extension instance to the given CloudEvent
/// </summary>
/// <param name="cloudEvent"></param>
void Attach(CloudEvent cloudEvent);
/// <summary>
/// Validates the given attribute value and normalizes it if needed.
/// Normalization may include changing the data type.
/// </summary>
/// <param name="key">Attribute name</param>
/// <param name="value">Attribute value</param>
/// <returns>true if the attribute is handled by this extension</returns>
bool ValidateAndNormalize(string key, ref dynamic value);
/// <summary>
/// Returns the CLR data type for the given attribute or NULL when
/// the attribute is not handled by this extension,
/// </summary>
/// <param name="name">Attribute name</param>
/// <returns>CLR type</returns>
Type GetAttributeType(string name);
}
}

View File

@ -4,16 +4,17 @@
namespace CloudNative.CloudEvents
{
using System.Collections.Generic;
using System.IO;
using System.Net.Mime;
public interface ICloudEventFormatter
{
CloudEvent DecodeStructuredEvent(Stream data, params ICloudEventExtension[] extensions);
CloudEvent DecodeStructuredEvent(byte[] data, params ICloudEventExtension[] extensions);
byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType);
object DecodeAttribute(string name, byte[] data);
byte[] EncodeAttribute(string name, object value);
CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<ICloudEventExtension> extensions);
CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable<ICloudEventExtension> extensions);
byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType, IEnumerable<ICloudEventExtension> extensions);
object DecodeAttribute(string name, byte[] data, IEnumerable<ICloudEventExtension> extensions);
byte[] EncodeAttribute(string name, object value, IEnumerable<ICloudEventExtension> extensions);
}
}

View File

@ -5,6 +5,7 @@
namespace CloudNative.CloudEvents
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Mime;
using System.Text;
@ -13,7 +14,15 @@ namespace CloudNative.CloudEvents
public class JsonEventFormatter : ICloudEventFormatter
{
public const string MediaTypeSuffix = "+json";
public CloudEvent DecodeStructuredEvent(Stream data, params ICloudEventExtension[] extensions)
{
return DecodeStructuredEvent(data, (IEnumerable<ICloudEventExtension>)extensions);
}
public CloudEvent DecodeStructuredEvent(Stream data, IEnumerable<ICloudEventExtension> extensions = null)
{
var jsonReader = new JsonTextReader(new StreamReader(data, Encoding.UTF8, true, 8192, true));
var jObject = JObject.Load(jsonReader);
@ -21,13 +30,18 @@ namespace CloudNative.CloudEvents
}
public CloudEvent DecodeStructuredEvent(byte[] data, params ICloudEventExtension[] extensions)
{
return DecodeStructuredEvent(data, (IEnumerable<ICloudEventExtension>)extensions);
}
public CloudEvent DecodeStructuredEvent(byte[] data, IEnumerable<ICloudEventExtension> extensions = null)
{
var jsonText = Encoding.UTF8.GetString(data);
var jObject = JObject.Parse(jsonText);
return DecodeJObject(jObject, extensions);
}
public CloudEvent DecodeJObject(JObject jObject, params ICloudEventExtension[] extensions)
public CloudEvent DecodeJObject(JObject jObject, IEnumerable<ICloudEventExtension> extensions = null)
{
var cloudEvent = new CloudEvent(extensions);
var attributes = cloudEvent.GetAttributes();
@ -56,14 +70,15 @@ namespace CloudNative.CloudEvents
break;
}
}
return cloudEvent;
}
public byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType)
public byte[] EncodeStructuredEvent(CloudEvent cloudEvent, out ContentType contentType, IEnumerable<ICloudEventExtension> extensions = null)
{
contentType = new ContentType("application/cloudevents+json")
{
CharSet = Encoding.UTF8.EncodingName
CharSet = Encoding.UTF8.WebName
};
JObject jObject = new JObject();
@ -79,17 +94,71 @@ namespace CloudNative.CloudEvents
jObject[keyValuePair.Key] = JToken.FromObject(keyValuePair.Value);
}
}
return Encoding.UTF8.GetBytes(jObject.ToString());
}
public object DecodeAttribute(string name, byte[] data)
public object DecodeAttribute(string name, byte[] data, IEnumerable<ICloudEventExtension> extensions = null)
{
throw new NotImplementedException();
switch (name)
{
case CloudEventAttributes.SpecVersionAttributeName:
case CloudEventAttributes.IdAttributeName:
case CloudEventAttributes.TypeAttributeName:
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string));
case CloudEventAttributes.TimeAttributeName:
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(DateTime));
case CloudEventAttributes.SourceAttributeName:
case CloudEventAttributes.SchemaUrlAttributeName:
var uri = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string)) as string;
return new Uri(uri);
case CloudEventAttributes.ContentTypeAttributeName:
var s = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), typeof(string)) as string;
return new ContentType(s);
}
if (extensions != null)
{
foreach (var extension in extensions)
{
Type type = extension.GetAttributeType(name);
if (type != null)
{
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), type);
}
}
}
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data));
}
public byte[] EncodeAttribute(string name, object value)
public byte[] EncodeAttribute(string name, object value, IEnumerable<ICloudEventExtension> extensions = null)
{
throw new NotImplementedException();
if (name.Equals(CloudEventAttributes.DataAttributeName))
{
if (value is Stream)
{
using (var buffer = new MemoryStream())
{
((Stream)value).CopyTo(buffer);
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(buffer.ToArray()));
}
}
}
if (extensions != null)
{
foreach (var extension in extensions)
{
Type type = extension.GetAttributeType(name);
if (type != null)
{
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(Convert.ChangeType(value, type)));
}
}
}
return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(value));
}
}
}

View File

@ -55,5 +55,15 @@ namespace CloudNative.CloudEvents.UnitTests
return false;
}
public Type GetAttributeType(string name)
{
switch (name)
{
case extensionAttribute:
return typeof(string);
}
return null;
}
}
}

View File

@ -4,6 +4,7 @@
namespace CloudNative.CloudEvents.UnitTests
{
using System;
using System.Collections.Generic;
public class ComExampleExtension2Extension : ICloudEventExtension
@ -59,5 +60,15 @@ namespace CloudNative.CloudEvents.UnitTests
return false;
}
public Type GetAttributeType(string name)
{
switch (name)
{
case extensionAttribute:
return typeof(ComExampleExtension2Data);
}
return null;
}
}
}

View File

@ -0,0 +1,194 @@
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
namespace CloudNative.CloudEvents.UnitTests
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Mime;
using System.Security.Authentication.ExtendedProtection;
using System.Threading.Tasks;
using Xunit;
public class HttpTest : IDisposable
{
private const string listenerAddress = "http://localhost:52671/";
private const string testContextHeader = "testcontext";
HttpListener listener;
ConcurrentDictionary<string, Func<HttpListenerContext, Task>> pendingRequests = new ConcurrentDictionary<string, Func<HttpListenerContext, Task>>();
public HttpTest()
{
listener = new HttpListener()
{
AuthenticationSchemes = AuthenticationSchemes.Anonymous,
Prefixes = { listenerAddress }
};
listener.Start();
listener.GetContextAsync().ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
HandleContext(t.Result);
}
});
}
async Task HandleContext(HttpListenerContext requestContext)
{
var ctxHeaderValue = requestContext.Request.Headers[testContextHeader];
if (pendingRequests.TryRemove(ctxHeaderValue, out var pending))
{
await pending(requestContext);
}
#pragma warning disable 4014
listener.GetContextAsync().ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
HandleContext(t.Result);
}
});
#pragma warning restore 4014
}
public void Dispose()
{
listener.Stop();
}
[Fact]
async Task HttpStructuredClientTest()
{
var cloudEvent = new CloudEvent("com.github.pull.create",
new Uri("https://github.com/cloudevents/spec/pull/123"))
{
Id = "A234-1234-1234",
Time = new DateTime(2018, 4, 5, 17, 31, 0, DateTimeKind.Utc),
ContentType = new ContentType(MediaTypeNames.Text.Xml),
Data = "<much wow=\"xml\"/>"
};
var attrs = cloudEvent.GetAttributes();
attrs["comexampleextension1"] = "value";
attrs["comexampleextension2"] = new { othervalue = 5 };
string ctx = Guid.NewGuid().ToString();
var content = new CloudEventContent(cloudEvent, ContentMode.Structured, new JsonEventFormatter());
content.Headers.Add(testContextHeader, ctx);
pendingRequests.TryAdd(ctx, async context =>
{
try
{
var receivedCloudEvent = await context.Request.ToCloudEventAsync(new JsonEventFormatter());
Assert.Equal("0.1", receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(),
receivedCloudEvent.Time.Value.ToUniversalTime());
Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), receivedCloudEvent.ContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);
var attr = receivedCloudEvent.GetAttributes();
Assert.Equal("value", (string)attr["comexampleextension1"]);
Assert.Equal(5, (int)((dynamic)attr["comexampleextension2"]).othervalue);
context.Response.StatusCode = (int)HttpStatusCode.NoContent;
}
catch (Exception e)
{
using (var sw = new StreamWriter(context.Response.OutputStream))
{
sw.Write(e.ToString());
context.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
}
}
context.Response.Close();
});
var httpClient = new HttpClient();
var result = (await httpClient.PostAsync(new Uri(listenerAddress + "ep"), content));
if (result.StatusCode != HttpStatusCode.NoContent)
{
throw new InvalidOperationException(result.Content.ReadAsStringAsync().GetAwaiter().GetResult());
}
}
[Fact]
async Task HttpBinaryClientTest()
{
var cloudEvent = new CloudEvent("com.github.pull.create",
new Uri("https://github.com/cloudevents/spec/pull/123"))
{
Id = "A234-1234-1234",
Time = new DateTime(2018, 4, 5, 17, 31, 0, DateTimeKind.Utc),
ContentType = new ContentType(MediaTypeNames.Text.Xml),
Data = "<much wow=\"xml\"/>"
};
var attrs = cloudEvent.GetAttributes();
attrs["comexampleextension1"] = "value";
attrs["comexampleextension2"] = new { othervalue = 5 };
string ctx = Guid.NewGuid().ToString();
var content = new CloudEventContent(cloudEvent, ContentMode.Binary, new JsonEventFormatter());
content.Headers.Add(testContextHeader, ctx);
pendingRequests.TryAdd(ctx, async context =>
{
try
{
var receivedCloudEvent = await context.Request.ToCloudEventAsync(new JsonEventFormatter());
Assert.Equal("0.1", receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(),
receivedCloudEvent.Time.Value.ToUniversalTime());
Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), receivedCloudEvent.ContentType);
using (var sr = new StreamReader((Stream)receivedCloudEvent.Data))
{
Assert.Equal("<much wow=\"xml\"/>", sr.ReadToEnd());
}
var attr = receivedCloudEvent.GetAttributes();
Assert.Equal("value", (string)attr["comexampleextension1"]);
Assert.Equal(5, (int)((dynamic)attr["comexampleextension2"]).othervalue);
context.Response.StatusCode = (int)HttpStatusCode.NoContent;
}
catch (Exception e)
{
using (var sw = new StreamWriter(context.Response.OutputStream))
{
sw.Write(e.ToString());
context.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
}
}
context.Response.Close();
});
var httpClient = new HttpClient();
var result = (await httpClient.PostAsync(new Uri(listenerAddress + "ep"), content));
if (result.StatusCode != HttpStatusCode.NoContent)
{
throw new InvalidOperationException(result.Content.ReadAsStringAsync().GetAwaiter().GetResult());
}
}
}
}