AQMP test coverage and related fixes

Signed-off-by: clemensv <clemensv@microsoft.com>
This commit is contained in:
clemensv 2018-11-26 18:15:54 +01:00
parent c3effb6c3f
commit 89123179af
3 changed files with 140 additions and 11 deletions

View File

@ -5,10 +5,13 @@
namespace CloudNative.CloudEvents.Amqp
{
using System;
using System.Collections.Generic;
using System.Dynamic;
using System.IO;
using System.Net.Mime;
using System.Threading.Tasks;
using global::Amqp;
using global::Amqp.Types;
public static class AmqpClientExtensions
{
@ -19,7 +22,7 @@ namespace CloudNative.CloudEvents.Amqp
static JsonEventFormatter jsonFormatter = new JsonEventFormatter();
public static bool IsCloudEvent(this Message message)
{
{
return ((message.Properties.ContentType != null &&
message.Properties.ContentType.ToString().StartsWith(CloudEvent.MediaType)) ||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader));
@ -65,8 +68,21 @@ namespace CloudNative.CloudEvents.Amqp
if (prop.Key is string &&
((string)prop.Key).StartsWith(AmqpHeaderPrefix, StringComparison.InvariantCultureIgnoreCase))
{
attributes[((string)prop.Key).Substring(AmqpHeaderPrefix.Length).ToLowerInvariant()] =
prop.Value;
if (prop.Value is Map)
{
IDictionary<string, object> exp = new ExpandoObject();
foreach (var props in (Map)prop.Value)
{
exp[props.Key.ToString()] = props.Value;
}
attributes[((string)prop.Key).Substring(AmqpHeaderPrefix.Length).ToLowerInvariant()] =
exp;
}
else
{
attributes[((string)prop.Key).Substring(AmqpHeaderPrefix.Length).ToLowerInvariant()] =
prop.Value;
}
}
}

View File

@ -4,9 +4,13 @@
namespace CloudNative.CloudEvents.Amqp
{
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using global::Amqp;
using global::Amqp.Framing;
using global::Amqp.Types;
public class AmqpCloudEventMessage : Message
{
@ -15,8 +19,9 @@ namespace CloudNative.CloudEvents.Amqp
if (contentMode == ContentMode.Structured)
{
this.BodySection = new Data
{ Binary = formatter.EncodeStructuredEvent(cloudEvent, out var contentType) };
this.Properties.ContentType = contentType.MediaType;
{ Binary = formatter.EncodeStructuredEvent(cloudEvent, out var contentType) };
this.Properties = new Properties() { ContentType = contentType.MediaType };
this.ApplicationProperties = new ApplicationProperties();
MapHeaders(cloudEvent);
return;
}
@ -43,7 +48,8 @@ namespace CloudNative.CloudEvents.Amqp
this.BodySection = new AmqpValue() { Value = cloudEvent.Data };
}
this.Properties.ContentType = cloudEvent.ContentType?.MediaType;
this.Properties = new Properties() { ContentType = cloudEvent.ContentType?.MediaType };
this.ApplicationProperties = new ApplicationProperties();
MapHeaders(cloudEvent);
}
@ -51,11 +57,32 @@ namespace CloudNative.CloudEvents.Amqp
{
foreach (var attribute in cloudEvent.GetAttributes())
{
if (attribute.Key != CloudEventAttributes.ContentTypeAttributeName &&
attribute.Key != CloudEventAttributes.DataAttributeName)
switch (attribute.Key)
{
this.ApplicationProperties.Map.Add("cloudEvents:" + attribute.Key, attribute.Value);
case CloudEventAttributes.DataAttributeName:
case CloudEventAttributes.ContentTypeAttributeName:
break;
default:
if (attribute.Value is Uri)
{
this.ApplicationProperties.Map.Add("cloudEvents:" + attribute.Key, attribute.Value.ToString());
}
else if (attribute.Value is DateTime || attribute.Value is DateTime || attribute.Value is string)
{
this.ApplicationProperties.Map.Add("cloudEvents:" + attribute.Key, attribute.Value);
}
else
{
Map dict = new Map();
foreach (PropertyDescriptor descriptor in TypeDescriptor.GetProperties(attribute.Value))
{
dict.Add(descriptor.Name,descriptor.GetValue(attribute.Value));
}
this.ApplicationProperties.Map.Add("cloudEvents:" + attribute.Key, dict);
}
break;
}
}
}
}

View File

@ -4,8 +4,94 @@
namespace CloudNative.CloudEvents.UnitTests
{
class AmqpTest
{
using System;
using System.Net.Mime;
using CloudNative.CloudEvents.Amqp;
using global::Amqp;
using Xunit;
public class AmqpTest
{
[Fact]
public void AmqpStructuredMessageTest()
{
// the AMQPNetLite library is factored such
// that we don't need to do a wire test here
var jsonEventFormatter = new JsonEventFormatter();
var cloudEvent = new CloudEvent("com.github.pull.create",
new Uri("https://github.com/cloudevents/spec/pull/123"))
{
Id = "A234-1234-1234",
Time = new DateTime(2018, 4, 5, 17, 31, 0, DateTimeKind.Utc),
ContentType = new ContentType(MediaTypeNames.Text.Xml),
Data = "<much wow=\"xml\"/>"
};
var attrs = cloudEvent.GetAttributes();
attrs["comexampleextension1"] = "value";
attrs["comexampleextension2"] = new { othervalue = 5 };
var message = new AmqpCloudEventMessage(cloudEvent, ContentMode.Structured, new JsonEventFormatter());
var encodedAmqpMessage = message.Encode();
var message1 = Message.Decode(encodedAmqpMessage);
var receivedCloudEvent = message1.ToCloudEvent();
Assert.Equal("0.2", receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(),
receivedCloudEvent.Time.Value.ToUniversalTime());
Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), receivedCloudEvent.ContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);
var attr = receivedCloudEvent.GetAttributes();
Assert.Equal("value", (string)attr["comexampleextension1"]);
Assert.Equal(5, (int)((dynamic)attr["comexampleextension2"]).othervalue);
}
[Fact]
public void AmqpBinaryMessageTest()
{
// the AMQPNetLite library is factored such
// that we don't need to do a wire test here
var jsonEventFormatter = new JsonEventFormatter();
var cloudEvent = new CloudEvent("com.github.pull.create",
new Uri("https://github.com/cloudevents/spec/pull/123"))
{
Id = "A234-1234-1234",
Time = new DateTime(2018, 4, 5, 17, 31, 0, DateTimeKind.Utc),
ContentType = new ContentType(MediaTypeNames.Text.Xml),
Data = "<much wow=\"xml\"/>"
};
var attrs = cloudEvent.GetAttributes();
attrs["comexampleextension1"] = "value";
attrs["comexampleextension2"] = new { othervalue = 5 };
var message = new AmqpCloudEventMessage(cloudEvent, ContentMode.Binary, new JsonEventFormatter());
var encodedAmqpMessage = message.Encode();
var message1 = Message.Decode(encodedAmqpMessage);
var receivedCloudEvent = message1.ToCloudEvent();
Assert.Equal("0.2", receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
Assert.Equal(new Uri("https://github.com/cloudevents/spec/pull/123"), receivedCloudEvent.Source);
Assert.Equal("A234-1234-1234", receivedCloudEvent.Id);
Assert.Equal(DateTime.Parse("2018-04-05T17:31:00Z").ToUniversalTime(),
receivedCloudEvent.Time.Value.ToUniversalTime());
Assert.Equal(new ContentType(MediaTypeNames.Text.Xml), receivedCloudEvent.ContentType);
Assert.Equal("<much wow=\"xml\"/>", receivedCloudEvent.Data);
var attr = receivedCloudEvent.GetAttributes();
Assert.Equal("value", (string)attr["comexampleextension1"]);
Assert.Equal(5, (int)((dynamic)attr["comexampleextension2"]).othervalue);
}
}
}