fix: Change AMQP data handling

Rather than the AMQP code "knowing" how to handle different data
types, delegate that to an event formatter.

There's *still* a little bit of an ugly
not-quite-single-responsibility in terms of event formatters;
CloudEvent formats tend to be specified in terms of structured mode
messages, not binary mode. It's possible that we should really
separate out "data formatting" from everything else... but it does
get intertwined in structured mode formats.

This change fixes #104, but we certainly want more testing and
documentation for this code in general.

Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
Jon Skeet 2021-03-10 12:45:38 +00:00 committed by Jon Skeet
parent 6656b5c123
commit 5b914bdf3e
2 changed files with 37 additions and 41 deletions

View File

@ -19,7 +19,7 @@ namespace CloudNative.CloudEvents.Amqp
public static bool IsCloudEvent(this Message message) => public static bool IsCloudEvent(this Message message) =>
HasCloudEventsContentType(message, out _) || HasCloudEventsContentType(message, out _) ||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader); message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader);
public static CloudEvent ToCloudEvent(this Message message, public static CloudEvent ToCloudEvent(this Message message,
CloudEventFormatter formatter, CloudEventFormatter formatter,
@ -27,7 +27,7 @@ namespace CloudNative.CloudEvents.Amqp
{ {
if (HasCloudEventsContentType(message, out var contentType)) if (HasCloudEventsContentType(message, out var contentType))
{ {
return formatter.DecodeStructuredModeMessage(new MemoryStream((byte[])message.Body), new ContentType(contentType), extensionAttributes); return formatter.DecodeStructuredModeMessage(new MemoryStream((byte[]) message.Body), new ContentType(contentType), extensionAttributes);
} }
else else
{ {
@ -44,10 +44,9 @@ namespace CloudNative.CloudEvents.Amqp
var cloudEvent = new CloudEvent(version, extensionAttributes) var cloudEvent = new CloudEvent(version, extensionAttributes)
{ {
Data = message.Body,
DataContentType = message.Properties.ContentType DataContentType = message.Properties.ContentType
}; };
foreach (var property in propertyMap) foreach (var property in propertyMap)
{ {
if (!(property.Key is string key && key.StartsWith(AmqpHeaderPrefix))) if (!(property.Key is string key && key.StartsWith(AmqpHeaderPrefix)))
@ -55,7 +54,7 @@ namespace CloudNative.CloudEvents.Amqp
continue; continue;
} }
string attributeName = key.Substring(AmqpHeaderPrefix.Length).ToLowerInvariant(); string attributeName = key.Substring(AmqpHeaderPrefix.Length).ToLowerInvariant();
// We've already dealt with the spec version. // We've already dealt with the spec version.
if (attributeName == CloudEventsSpecVersion.SpecVersionAttribute.Name) if (attributeName == CloudEventsSpecVersion.SpecVersionAttribute.Name)
{ {
@ -72,7 +71,7 @@ namespace CloudNative.CloudEvents.Amqp
// *is* MinValue or MaxValue if we wanted to.) // *is* MinValue or MaxValue if we wanted to.)
dt = DateTime.SpecifyKind(dt, DateTimeKind.Utc); dt = DateTime.SpecifyKind(dt, DateTimeKind.Utc);
} }
cloudEvent[attributeName] = (DateTimeOffset)dt; cloudEvent[attributeName] = (DateTimeOffset) dt;
} }
// URIs are serialized as strings, but we need to convert them back to URIs. // URIs are serialized as strings, but we need to convert them back to URIs.
// It's simplest to let CloudEvent do this for us. // It's simplest to let CloudEvent do this for us.
@ -85,6 +84,18 @@ namespace CloudNative.CloudEvents.Amqp
cloudEvent[attributeName] = property.Value; cloudEvent[attributeName] = property.Value;
} }
} }
// Populate the data after the rest of the CloudEvent
if (message.BodySection is Data data)
{
// Note: Fetching the Binary property will always retrieve the data. It will
// be copied from the Buffer property if necessary.
formatter.DecodeBinaryModeEventData(data.Binary, cloudEvent);
}
else if (message.BodySection is object)
{
throw new ArgumentException("Binary mode data in AMQP message must be in the application data section");
}
return cloudEvent; return cloudEvent;
} }
} }
@ -113,7 +124,7 @@ namespace CloudNative.CloudEvents.Amqp
properties = new Properties { ContentType = contentType.MediaType }; properties = new Properties { ContentType = contentType.MediaType };
break; break;
case ContentMode.Binary: case ContentMode.Binary:
bodySection = SerializeData(cloudEvent.Data); bodySection = new Data { Binary = formatter.EncodeBinaryModeEventData(cloudEvent) };
properties = new Properties { ContentType = cloudEvent.DataContentType }; properties = new Properties { ContentType = cloudEvent.DataContentType };
break; break;
default: default:
@ -158,33 +169,5 @@ namespace CloudNative.CloudEvents.Amqp
} }
return applicationProperties; return applicationProperties;
} }
/// <summary>
/// Convert data into a suitable format for inclusion in an AMQP record.
/// TODO: Asynchronous version?
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
private static RestrictedDescribed SerializeData(object data)
{
switch (data)
{
case null:
return null;
case byte[] bytes:
return new Data { Binary = bytes };
case MemoryStream memoryStream:
// Note: this will return the whole stream, regardless of position...
return new Data { Binary = memoryStream.ToArray() };
case Stream stream:
var buffer = new MemoryStream();
stream.CopyTo(buffer);
return new Data { Binary = buffer.ToArray() };
case string text:
return new AmqpValue { Value = text };
default:
throw new ArgumentException($"Unsupported type for AMQP data: {data.GetType()}");
}
}
} }
} }

View File

@ -3,9 +3,11 @@
// See LICENSE file in the project root for full license information. // See LICENSE file in the project root for full license information.
using Amqp; using Amqp;
using Amqp.Framing;
using CloudNative.CloudEvents.NewtonsoftJson; using CloudNative.CloudEvents.NewtonsoftJson;
using System; using System;
using System.Net.Mime; using System.Net.Mime;
using System.Text;
using Xunit; using Xunit;
using static CloudNative.CloudEvents.UnitTests.TestHelpers; using static CloudNative.CloudEvents.UnitTests.TestHelpers;
@ -68,15 +70,13 @@ namespace CloudNative.CloudEvents.Amqp.UnitTests
["comexampleextension1"] = "value" ["comexampleextension1"] = "value"
}; };
// No formatter is needed for binary mode. var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, null);
Assert.True(message.IsCloudEvent()); Assert.True(message.IsCloudEvent());
var encodedAmqpMessage = message.Encode(); var encodedAmqpMessage = message.Encode();
var message1 = Message.Decode(encodedAmqpMessage); var message1 = Message.Decode(encodedAmqpMessage);
Assert.True(message1.IsCloudEvent()); Assert.True(message1.IsCloudEvent());
// No formatter is needed for binary mode. var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter());
var receivedCloudEvent = message1.ToCloudEvent(null);
Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion); Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
Assert.Equal("com.github.pull.create", receivedCloudEvent.Type); Assert.Equal("com.github.pull.create", receivedCloudEvent.Type);
@ -103,13 +103,26 @@ namespace CloudNative.CloudEvents.Amqp.UnitTests
Data = "<much wow=\"xml\"/>" Data = "<much wow=\"xml\"/>"
}; };
var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, null); var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
var encodedAmqpMessage = message.Encode(); var encodedAmqpMessage = message.Encode();
var message1 = Message.Decode(encodedAmqpMessage); var message1 = Message.Decode(encodedAmqpMessage);
var receivedCloudEvent = message1.ToCloudEvent(null); var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter());
AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time.Value); AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time.Value);
} }
[Fact]
public void EncodeTextDataInBinaryMode_PopulatesDataProperty()
{
var cloudEvent = new CloudEvent().PopulateRequiredAttributes();
cloudEvent.DataContentType = "text/plain";
cloudEvent.Data = "some text";
var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter());
var body = Assert.IsType<Data>(message.BodySection);
var text = Encoding.UTF8.GetString(body.Binary);
Assert.Equal("some text", text);
}
} }
} }