From 5b914bdf3e71f6ce4e0f4ee2521edf5182382fc5 Mon Sep 17 00:00:00 2001 From: Jon Skeet Date: Wed, 10 Mar 2021 12:45:38 +0000 Subject: [PATCH] fix: Change AMQP data handling Rather than the AMQP code "knowing" how to handle different data types, delegate that to an event formatter. There's *still* a little bit of an ugly not-quite-single-responsibility in terms of event formatters; CloudEvent formats tend to be specified in terms of structured mode messages, not binary mode. It's possible that we should really separate out "data formatting" from everything else... but it does get intertwined in structured mode formats. This change fixes #104, but we certainly want more testing and documentation for this code in general. Signed-off-by: Jon Skeet --- .../AmqpClientExtensions.cs | 53 +++++++------------ .../Amqp/AmqpTest.cs | 25 ++++++--- 2 files changed, 37 insertions(+), 41 deletions(-) diff --git a/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs b/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs index 8739146..a5818ca 100644 --- a/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs +++ b/src/CloudNative.CloudEvents.Amqp/AmqpClientExtensions.cs @@ -19,7 +19,7 @@ namespace CloudNative.CloudEvents.Amqp public static bool IsCloudEvent(this Message message) => HasCloudEventsContentType(message, out _) || - message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader); + message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader); public static CloudEvent ToCloudEvent(this Message message, CloudEventFormatter formatter, @@ -27,7 +27,7 @@ namespace CloudNative.CloudEvents.Amqp { if (HasCloudEventsContentType(message, out var contentType)) { - return formatter.DecodeStructuredModeMessage(new MemoryStream((byte[])message.Body), new ContentType(contentType), extensionAttributes); + return formatter.DecodeStructuredModeMessage(new MemoryStream((byte[]) message.Body), new ContentType(contentType), extensionAttributes); } else { @@ -44,10 +44,9 @@ namespace CloudNative.CloudEvents.Amqp var cloudEvent = new CloudEvent(version, extensionAttributes) { - Data = message.Body, DataContentType = message.Properties.ContentType }; - + foreach (var property in propertyMap) { if (!(property.Key is string key && key.StartsWith(AmqpHeaderPrefix))) @@ -55,7 +54,7 @@ namespace CloudNative.CloudEvents.Amqp continue; } string attributeName = key.Substring(AmqpHeaderPrefix.Length).ToLowerInvariant(); - + // We've already dealt with the spec version. if (attributeName == CloudEventsSpecVersion.SpecVersionAttribute.Name) { @@ -72,7 +71,7 @@ namespace CloudNative.CloudEvents.Amqp // *is* MinValue or MaxValue if we wanted to.) dt = DateTime.SpecifyKind(dt, DateTimeKind.Utc); } - cloudEvent[attributeName] = (DateTimeOffset)dt; + cloudEvent[attributeName] = (DateTimeOffset) dt; } // URIs are serialized as strings, but we need to convert them back to URIs. // It's simplest to let CloudEvent do this for us. @@ -85,6 +84,18 @@ namespace CloudNative.CloudEvents.Amqp cloudEvent[attributeName] = property.Value; } } + // Populate the data after the rest of the CloudEvent + if (message.BodySection is Data data) + { + // Note: Fetching the Binary property will always retrieve the data. It will + // be copied from the Buffer property if necessary. + formatter.DecodeBinaryModeEventData(data.Binary, cloudEvent); + } + else if (message.BodySection is object) + { + throw new ArgumentException("Binary mode data in AMQP message must be in the application data section"); + } + return cloudEvent; } } @@ -113,7 +124,7 @@ namespace CloudNative.CloudEvents.Amqp properties = new Properties { ContentType = contentType.MediaType }; break; case ContentMode.Binary: - bodySection = SerializeData(cloudEvent.Data); + bodySection = new Data { Binary = formatter.EncodeBinaryModeEventData(cloudEvent) }; properties = new Properties { ContentType = cloudEvent.DataContentType }; break; default: @@ -158,33 +169,5 @@ namespace CloudNative.CloudEvents.Amqp } return applicationProperties; } - - /// - /// Convert data into a suitable format for inclusion in an AMQP record. - /// TODO: Asynchronous version? - /// - /// - /// - private static RestrictedDescribed SerializeData(object data) - { - switch (data) - { - case null: - return null; - case byte[] bytes: - return new Data { Binary = bytes }; - case MemoryStream memoryStream: - // Note: this will return the whole stream, regardless of position... - return new Data { Binary = memoryStream.ToArray() }; - case Stream stream: - var buffer = new MemoryStream(); - stream.CopyTo(buffer); - return new Data { Binary = buffer.ToArray() }; - case string text: - return new AmqpValue { Value = text }; - default: - throw new ArgumentException($"Unsupported type for AMQP data: {data.GetType()}"); - } - } } } \ No newline at end of file diff --git a/test/CloudNative.CloudEvents.UnitTests/Amqp/AmqpTest.cs b/test/CloudNative.CloudEvents.UnitTests/Amqp/AmqpTest.cs index c20fa52..61922e9 100644 --- a/test/CloudNative.CloudEvents.UnitTests/Amqp/AmqpTest.cs +++ b/test/CloudNative.CloudEvents.UnitTests/Amqp/AmqpTest.cs @@ -3,9 +3,11 @@ // See LICENSE file in the project root for full license information. using Amqp; +using Amqp.Framing; using CloudNative.CloudEvents.NewtonsoftJson; using System; using System.Net.Mime; +using System.Text; using Xunit; using static CloudNative.CloudEvents.UnitTests.TestHelpers; @@ -68,15 +70,13 @@ namespace CloudNative.CloudEvents.Amqp.UnitTests ["comexampleextension1"] = "value" }; - // No formatter is needed for binary mode. - var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, null); + var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter()); Assert.True(message.IsCloudEvent()); var encodedAmqpMessage = message.Encode(); var message1 = Message.Decode(encodedAmqpMessage); Assert.True(message1.IsCloudEvent()); - // No formatter is needed for binary mode. - var receivedCloudEvent = message1.ToCloudEvent(null); + var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter()); Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion); Assert.Equal("com.github.pull.create", receivedCloudEvent.Type); @@ -103,13 +103,26 @@ namespace CloudNative.CloudEvents.Amqp.UnitTests Data = "" }; - var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, null); + var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter()); var encodedAmqpMessage = message.Encode(); var message1 = Message.Decode(encodedAmqpMessage); - var receivedCloudEvent = message1.ToCloudEvent(null); + var receivedCloudEvent = message1.ToCloudEvent(new JsonEventFormatter()); AssertTimestampsEqual("2018-04-05T17:31:00Z", receivedCloudEvent.Time.Value); } + + [Fact] + public void EncodeTextDataInBinaryMode_PopulatesDataProperty() + { + var cloudEvent = new CloudEvent().PopulateRequiredAttributes(); + cloudEvent.DataContentType = "text/plain"; + cloudEvent.Data = "some text"; + + var message = cloudEvent.ToAmqpMessage(ContentMode.Binary, new JsonEventFormatter()); + var body = Assert.IsType(message.BodySection); + var text = Encoding.UTF8.GetString(body.Binary); + Assert.Equal("some text", text); + } } } \ No newline at end of file