From 32bd9c6f14c06a7b1f9186a2dd3dc5ef7f90fa5a Mon Sep 17 00:00:00 2001 From: Jon Skeet Date: Tue, 13 May 2025 11:00:16 +0100 Subject: [PATCH] Prototype of a delegating CloudEventFormatter This is the proposed (but certainly not finalized) approach for #321. --- .../CloudEventFormatters.cs | 205 ++++++++++++++++++ .../DelegatingFormatter.cs | 151 +++++++++++++ 2 files changed, 356 insertions(+) create mode 100644 src/CloudNative.CloudEvents/CloudEventFormatters.cs create mode 100644 src/CloudNative.CloudEvents/DelegatingFormatter.cs diff --git a/src/CloudNative.CloudEvents/CloudEventFormatters.cs b/src/CloudNative.CloudEvents/CloudEventFormatters.cs new file mode 100644 index 0000000..c13e230 --- /dev/null +++ b/src/CloudNative.CloudEvents/CloudEventFormatters.cs @@ -0,0 +1,205 @@ +// Copyright 2025 Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +using CloudNative.CloudEvents.Core; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net.Mime; +using System.Threading.Tasks; + +namespace CloudNative.CloudEvents; + +/// +/// Extension methods for . +/// +public static class CloudEventFormatters +{ + /// + /// + /// + /// + /// + /// + public static CloudEventFormatter WithDefaultDecodingExtensions(this CloudEventFormatter formatter, IEnumerable extensionAttributes) => + new DefaultExtensionsFormatter(formatter, extensionAttributes); + + /// + /// + /// + /// + /// + /// + public static CloudEventFormatter WithValidation(this CloudEventFormatter formatter, Action validationAction) => + WithValidation(formatter, validationAction, validationAction); + + /// + /// + /// + /// + /// + /// + /// + public static CloudEventFormatter WithValidation(this CloudEventFormatter formatter, Action? encodingValidationAction, Action? decodingValidationAction) => + new ValidatingFormatter(formatter, encodingValidationAction, decodingValidationAction); + + /// + /// Delegating formatter which applies additional validation before each encoding method, and/or after each decoding method. + /// + private class ValidatingFormatter : DelegatingFormatter + { + private readonly Action? encodingValidationAction; + private readonly Action? decodingValidationAction; + + internal ValidatingFormatter(CloudEventFormatter formatter, Action? encodingValidationAction, Action? decodingValidationAction) : base(formatter) + { + this.encodingValidationAction = encodingValidationAction; + this.decodingValidationAction = decodingValidationAction; + } + + public override IReadOnlyList DecodeBatchModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) + { + var result = base.DecodeBatchModeMessage(body, contentType, extensionAttributes); + foreach (var evt in result) + { + decodingValidationAction?.Invoke(evt); + } + return result; + } + + public override IReadOnlyList DecodeBatchModeMessage(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) + { + var result = base.DecodeBatchModeMessage(body, contentType, extensionAttributes); + foreach (var evt in result) + { + decodingValidationAction?.Invoke(evt); + } + return result; + } + + public override async Task> DecodeBatchModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) + { + var result = await base.DecodeBatchModeMessageAsync(body, contentType, extensionAttributes).ConfigureAwait(false); + foreach (var evt in result) + { + decodingValidationAction?.Invoke(evt); + } + return result; + } + + public override void DecodeBinaryModeEventData(ReadOnlyMemory body, CloudEvent cloudEvent) + { + base.DecodeBinaryModeEventData(body, cloudEvent); + decodingValidationAction?.Invoke(cloudEvent); + } + + public override CloudEvent DecodeStructuredModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) + { + var evt = base.DecodeStructuredModeMessage(body, contentType, extensionAttributes); + decodingValidationAction?.Invoke(evt); + return evt; + } + + public override CloudEvent DecodeStructuredModeMessage(Stream messageBody, ContentType? contentType, IEnumerable? extensionAttributes) + { + var evt = base.DecodeStructuredModeMessage(messageBody, contentType, extensionAttributes); + decodingValidationAction?.Invoke(evt); + return evt; + } + + public override async Task DecodeStructuredModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) + { + var evt = await base.DecodeStructuredModeMessageAsync(body, contentType, extensionAttributes).ConfigureAwait(false); + decodingValidationAction?.Invoke(evt); + return evt; + } + + public override ReadOnlyMemory EncodeBatchModeMessage(IEnumerable cloudEvents, out ContentType contentType) + { + // This approach ensures that we only evaluate the original sequence once, without having to materialize it. + // It does mean we could end up encoding some before aborting later (and therefore wasting effort) though. + var validating = cloudEvents.Select(evt => { encodingValidationAction?.Invoke(evt); return evt; }); + return base.EncodeBatchModeMessage(validating, out contentType); + } + + public override ReadOnlyMemory EncodeBinaryModeEventData(CloudEvent cloudEvent) + { + encodingValidationAction?.Invoke(cloudEvent); + return base.EncodeBinaryModeEventData(cloudEvent); + } + + public override ReadOnlyMemory EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType) + { + encodingValidationAction?.Invoke(cloudEvent); + return base.EncodeStructuredModeMessage(cloudEvent, out contentType); + } + } + + /// + /// Delegating formatter which applies default extension attributes on all decoding method calls. + /// + private class DefaultExtensionsFormatter : DelegatingFormatter + { + private readonly IReadOnlyList defaultExtensionAttributes; + + internal DefaultExtensionsFormatter(CloudEventFormatter formatter, IEnumerable extensionAttributes) : base(formatter) + { + defaultExtensionAttributes = Validation.CheckNotNull(extensionAttributes, nameof(extensionAttributes)).ToList().AsReadOnly(); + foreach (var attribute in defaultExtensionAttributes) + { + if (!attribute.IsExtension) + { + throw new ArgumentException($"The {nameof(CloudEventAttribute.IsExtension)} must be true for all default extension attributes", nameof(extensionAttributes)); + } + } + } + + public override IReadOnlyList DecodeBatchModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) => + base.DecodeBatchModeMessage(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes)); + + public override IReadOnlyList DecodeBatchModeMessage(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) => + base.DecodeBatchModeMessage(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes)); + + public override Task> DecodeBatchModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) => + base.DecodeBatchModeMessageAsync(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes)); + + public override CloudEvent DecodeStructuredModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) => + base.DecodeStructuredModeMessage(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes)); + + public override CloudEvent DecodeStructuredModeMessage(Stream messageBody, ContentType? contentType, IEnumerable? extensionAttributes) => + base.DecodeStructuredModeMessage(messageBody, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes)); + + public override Task DecodeStructuredModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) => + base.DecodeStructuredModeMessageAsync(body, contentType, ComputeEffectiveExtensionAttributes(extensionAttributes)); + + private IEnumerable ComputeEffectiveExtensionAttributes(IEnumerable? additionalExtensions) + { + if (additionalExtensions is null) + { + return defaultExtensionAttributes; + } + var result = new List(); + + foreach (var additional in additionalExtensions) + { + var match = defaultExtensionAttributes.FirstOrDefault(def => def.Name == additional.Name); + if (match is not null) + { + if (match != additional) + { + // TODO: Improve this message + throw new ArgumentException($"The extension attribute {match.Name} is already provided as a default, using a different object"); + } + } + else + { + result.Add(additional); + } + } + result.AddRange(defaultExtensionAttributes); + return result; + } + } +} diff --git a/src/CloudNative.CloudEvents/DelegatingFormatter.cs b/src/CloudNative.CloudEvents/DelegatingFormatter.cs new file mode 100644 index 0000000..c444a14 --- /dev/null +++ b/src/CloudNative.CloudEvents/DelegatingFormatter.cs @@ -0,0 +1,151 @@ +// Copyright 2025 Cloud Native Foundation. +// Licensed under the Apache 2.0 license. +// See LICENSE file in the project root for full license information. + +using CloudNative.CloudEvents.Core; +using System; +using System.Collections.Generic; +using System.IO; +using System.Net.Mime; +using System.Threading.Tasks; + +namespace CloudNative.CloudEvents; + +// TODO: +// - Naming: +// - Is the namespace appropriate? We have very few types in this root namespace +// - Delegating or Decorating? +// - Fill in the "CloudEvent" part, e.g. DelegatingCloudEventFormatter? +// - Expose the original formatter, perhaps in a protected way? +// - All documentation +// - We can't call original.InferContentType, because it's protected. Is that an issue? (Users can always override GetOrInferContentType instead.) +// - Are we happy with this being public? With the extension methods being public, this class doesn't have to be. +// - Are we happy with the extension methods being extension methods? + +/// +/// +/// +public abstract class DelegatingFormatter : CloudEventFormatter +{ + /// + /// The formatter to delegate to. + /// + private readonly CloudEventFormatter original; + + /// + /// + /// + /// + public DelegatingFormatter(CloudEventFormatter original) + { + this.original = Validation.CheckNotNull(original, nameof(original)); + } + + /// + /// + /// + /// + /// + /// + /// + /// + public override CloudEvent DecodeStructuredModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) => + original.DecodeStructuredModeMessage(body, contentType, extensionAttributes); + + /// + /// + /// + /// + /// + /// + /// + public override CloudEvent DecodeStructuredModeMessage(Stream messageBody, ContentType? contentType, IEnumerable? extensionAttributes) => + original.DecodeStructuredModeMessage(messageBody, contentType, extensionAttributes); + + /// + /// + /// + /// + /// + /// + /// + public override Task DecodeStructuredModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) => + original.DecodeStructuredModeMessageAsync(body, contentType, extensionAttributes); + + /// + /// + /// + /// + /// + /// + /// + public override ReadOnlyMemory EncodeStructuredModeMessage(CloudEvent cloudEvent, out ContentType contentType) => + original.EncodeStructuredModeMessage(cloudEvent, out contentType); + + /// + /// + /// + /// + /// + /// + public override void DecodeBinaryModeEventData(ReadOnlyMemory body, CloudEvent cloudEvent) => + original.DecodeBinaryModeEventData(body, cloudEvent); + + /// + /// + /// + /// + /// + /// + public override ReadOnlyMemory EncodeBinaryModeEventData(CloudEvent cloudEvent) => + original.EncodeBinaryModeEventData(cloudEvent); + + /// + /// + /// + /// + /// + /// + /// + /// + public override IReadOnlyList DecodeBatchModeMessage(ReadOnlyMemory body, ContentType? contentType, IEnumerable? extensionAttributes) => + original.DecodeBatchModeMessage(body, contentType, extensionAttributes); + + /// + /// + /// + /// + /// + /// + /// + public override IReadOnlyList DecodeBatchModeMessage(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) => + original.DecodeBatchModeMessage(body, contentType, extensionAttributes); + + /// + /// + /// + /// + /// + /// + /// + public override Task> DecodeBatchModeMessageAsync(Stream body, ContentType? contentType, IEnumerable? extensionAttributes) => + original.DecodeBatchModeMessageAsync(body, contentType, extensionAttributes); + + /// + /// + /// + /// + /// + /// + /// + public override ReadOnlyMemory EncodeBatchModeMessage(IEnumerable cloudEvents, out ContentType contentType) => + original.EncodeBatchModeMessage(cloudEvents, out contentType); + + /// + /// + /// + /// + /// + public override string? GetOrInferDataContentType(CloudEvent cloudEvent) => + original.GetOrInferDataContentType(cloudEvent); +}