feat: Make the AMQP protocol bindings follow conventions
Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
parent
b61e639859
commit
450ba23e96
|
@ -6,11 +6,15 @@ using Amqp;
|
|||
using Amqp.Framing;
|
||||
using Amqp.Types;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using System.Net.Mime;
|
||||
|
||||
namespace CloudNative.CloudEvents.Amqp
|
||||
{
|
||||
/// <summary>
|
||||
/// Extension methods to convert between CloudEvents and AMQP messages.
|
||||
/// </summary>
|
||||
public static class AmqpClientExtensions
|
||||
{
|
||||
internal const string AmqpHeaderPrefix = "cloudEvents:";
|
||||
|
@ -21,10 +25,34 @@ namespace CloudNative.CloudEvents.Amqp
|
|||
HasCloudEventsContentType(message, out _) ||
|
||||
message.ApplicationProperties.Map.ContainsKey(SpecVersionAmqpHeader);
|
||||
|
||||
public static CloudEvent ToCloudEvent(this Message message,
|
||||
/// <summary>
|
||||
/// Converts this AMQP message into a CloudEvent object.
|
||||
/// </summary>
|
||||
/// <param name="message">The AMQP message to convert. Must not be null.</param>
|
||||
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
|
||||
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
|
||||
/// <returns>A reference to a validated CloudEvent instance.</returns>
|
||||
public static CloudEvent ToCloudEvent(
|
||||
this Message message,
|
||||
CloudEventFormatter formatter,
|
||||
params CloudEventAttribute[] extensionAttributes)
|
||||
params CloudEventAttribute[] extensionAttributes) =>
|
||||
ToCloudEvent(message, formatter, (IEnumerable<CloudEventAttribute>) extensionAttributes);
|
||||
|
||||
/// <summary>
|
||||
/// Converts this AMQP message into a CloudEvent object.
|
||||
/// </summary>
|
||||
/// <param name="httpResponseMessage">The AMQP message to convert. Must not be null.</param>
|
||||
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
|
||||
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
|
||||
/// <returns>A reference to a validated CloudEvent instance.</returns>
|
||||
public static CloudEvent ToCloudEvent(
|
||||
this Message message,
|
||||
CloudEventFormatter formatter,
|
||||
IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||
{
|
||||
message = message ?? throw new ArgumentNullException(nameof(message));
|
||||
formatter = formatter ?? throw new ArgumentNullException(nameof(formatter));
|
||||
|
||||
if (HasCloudEventsContentType(message, out var contentType))
|
||||
{
|
||||
return formatter.DecodeStructuredModeMessage(new MemoryStream((byte[]) message.Body), new ContentType(contentType), extensionAttributes);
|
||||
|
@ -32,15 +60,13 @@ namespace CloudNative.CloudEvents.Amqp
|
|||
else
|
||||
{
|
||||
var propertyMap = message.ApplicationProperties.Map;
|
||||
if (!propertyMap.TryGetValue(SpecVersionAmqpHeader, out var versionId) || !(versionId is string versionIdText))
|
||||
if (!propertyMap.TryGetValue(SpecVersionAmqpHeader, out var versionId))
|
||||
{
|
||||
throw new ArgumentException("Request is not a CloudEvent");
|
||||
}
|
||||
var version = CloudEventsSpecVersion.FromVersionId(versionIdText);
|
||||
if (version is null)
|
||||
{
|
||||
throw new ArgumentException($"Unsupported CloudEvents spec version '{versionIdText}'");
|
||||
}
|
||||
|
||||
var version = CloudEventsSpecVersion.FromVersionId(versionId as string)
|
||||
?? throw new ArgumentException($"Unknown CloudEvents spec version '{versionId}'", nameof(message));
|
||||
|
||||
var cloudEvent = new CloudEvent(version, extensionAttributes)
|
||||
{
|
||||
|
@ -96,7 +122,7 @@ namespace CloudNative.CloudEvents.Amqp
|
|||
throw new ArgumentException("Binary mode data in AMQP message must be in the application data section");
|
||||
}
|
||||
|
||||
return cloudEvent;
|
||||
return cloudEvent.ValidateForConversion(nameof(message));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,8 +133,18 @@ namespace CloudNative.CloudEvents.Amqp
|
|||
return contentType?.StartsWith(CloudEvent.MediaType) == true;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Converts a CloudEvent to <see cref="Message"/>.
|
||||
/// </summary>
|
||||
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
|
||||
/// <param name="contentMode">Content mode. Structured or binary.</param>
|
||||
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
|
||||
public static Message ToAmqpMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter)
|
||||
{
|
||||
cloudEvent = cloudEvent ?? throw new ArgumentNullException(nameof(cloudEvent));
|
||||
cloudEvent.ValidateForConversion(nameof(cloudEvent));
|
||||
formatter = formatter ?? throw new ArgumentNullException(nameof(formatter));
|
||||
|
||||
var applicationProperties = MapHeaders(cloudEvent);
|
||||
RestrictedDescribed bodySection;
|
||||
Properties properties;
|
||||
|
|
Loading…
Reference in New Issue